summaryrefslogtreecommitdiff
path: root/src/frag_monitor.c
diff options
context:
space:
mode:
authorlishu <[email protected]>2018-11-30 13:04:50 +0800
committerlishu <[email protected]>2018-11-30 13:04:50 +0800
commitc9ea9d4c57745bd3eb6375f802508428f864120e (patch)
tree85336b0df0ddf7d166144ad4eaaebf58fe4db197 /src/frag_monitor.c
parent69965fc7d12a4e77d758c9e83232f9fbdab6e634 (diff)
2018.11.12 1. 1. set K_PROJECT because different maat table
Diffstat (limited to 'src/frag_monitor.c')
-rw-r--r--src/frag_monitor.c1560
1 files changed, 1560 insertions, 0 deletions
diff --git a/src/frag_monitor.c b/src/frag_monitor.c
new file mode 100644
index 0000000..2f27038
--- /dev/null
+++ b/src/frag_monitor.c
@@ -0,0 +1,1560 @@
+//author: lishu
+//date: 2015-9-23
+
+
+#include <sys/ioctl.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <math.h>
+#include <net/if.h>
+#include <strings.h>
+#include <stdio.h>
+#include <stdio.h>
+#include <assert.h>
+#include <ctype.h>
+
+#include "MESA_handle_logger.h"
+#include "MESA_htable.h"
+#include "MESA_prof_load.h"
+#include "MESA_trace.h"
+#include "Maat_rule.h"
+#include "wired_cfg.h"
+#include "field_stat2.h"
+#include "http.h"
+#include "AV_feedback.h"
+#include "AV_charater.h"
+#include "AV_prog_table.h"
+#include "AV_log.h"
+#include "AV_kill_connection.h"
+#include "cJSON.h"
+#include "KafkaProducer.h"
+
+#include "appdsender.h"
+#include "opt.h"
+#include "opt_in.h"
+#include "optregister.h"
+#include "frag_monitor.h"
+#include "frag_common.h"
+#include "frag_block.h"
+
+int FRAG_MONITOR_VERSION_2_0_20181112 = 0;
+void frag_monitor_history()
+{
+ //2015.09.23 create the project
+ //2016.05.04 pid add tcp seq
+ //2016.05.09 add send switch, add static
+ //2016.05.11 add mediainfo layer_addr
+ //2016.05.13 proc_frgmnt_hit log
+ //2016.08.16 content-type sendback
+ //2016.08.26 ������������߳�����ʶ��
+ //2016.11.02 302�����ݲ�������Ƶ����������
+ //2017.01.03 mesa_tcp lock, ʹ��field_stateͳ��
+ //2017.02.21 support app
+ //2017.02.24 add switch to close log
+ //2017.03.02 cache index info and sendto kafka
+ //2017.03.22 JSON: ISN ADDR_LIST PROTOCOL PROXY -1 to 48 bit
+ //2017.04.07 1. log thread safe 2. media_type not protocol
+ //2017.04.11 1. add topic stat
+ //2017.04.12 1. tuple7
+ //2017.04.17 1. cJSON_print free
+ //2017.04.25 1. HLS OSMF protocol as HTTP
+ //2017.05.03 1. index gzip static
+ //2017.05.15 1. OSMF feature change to f4f 2. hls tpl
+ //2017.05.24 1. frag_fd
+ //2017.07.13 1. add similar scan
+ //2017.09.06 1. delete c2s_content 2. time
+ //2017.09.11 1. reconstruction
+ //2017.09.23 1. test
+ //2017.09.25 1. MEDIA_CNVG_TOPIC 2. delete .inf 3.hls_data_s2c
+ //2017.09.29 1. change index->data when double dir 2. frag_monitor.conf 3. frag_report_switch
+ //2017.11.14 1. add frag_flag in http session
+ //2017.11.30 1. add ts_data feature
+ //2017.12.08 1. modify the method that tuple7 generate pid
+ //2018.01.12 1. field_stat monitor 2. add url_key opt when send meta
+ //2018.01.29 1. use kafka_handle defined by av_master
+ //2018.03.15 1. support other type
+ //2018.03.16 1. add frag notice
+ //2018.05.09 1. add frag_json_report, addr+scan server_ip
+ //2018.07.10 1. 1. refer send to kafka 2. close frag forecast
+ //2018.07.24 1. 1. create maat feather
+ //2018.11.12 1. 1. set K_PROJECT because different maat table
+}
+
+frag_monitor_runtime_parameter_t g_frag_prog_para;
+/*sapp threadnum*/
+extern int g_iThreadNum;
+/*av_master*/
+//extern KafkaProducer* g_multi_kafka_producer;
+extern rd_kafka_t* g_multi_kafka_producer;
+/*av master*/
+extern Maat_feather_t g_AV_global_feather;
+
+const char g_app_feature[4] = {0X50, 0X4B, 0X03, 0X04};
+const char g_osmf_feature[4] = {0X6D, 0x64, 0x61, 0x74};
+
+const char* g_log_des[FRAG_LOG_NUM] =
+{
+ "index",
+ "rssb_meta",
+ "rssb_zero"
+};
+
+const char* hash_eliminate_type[3] =
+{
+ "",
+ "ELIMINATE_TYPE_NUM",
+ "ELIMINATE_TYPE_TIME"
+};
+
+void frag_write_to_log(int type, frag_info_t* frag, http_infor* a_http,struct streaminfo *a_tcp, int thread_seq)
+{
+ if(!g_frag_prog_para.log_switch) return;
+ char buf[2048] = {0};
+ int buflen = 0;
+ time_t cur_time;
+ struct tm now;
+ char now_time[32] = {0};
+ char day_time[32] = {0};
+ char filename[MAX_PATH_LEN] = {0};
+
+ time(&cur_time);
+ localtime_r(&cur_time, &now);
+ strftime(now_time, sizeof(now_time), "%Y-%m-%d %H:%M:%S", &now);
+
+ /*init*/
+ if(NULL==g_frag_prog_para.fraginfo_file[thread_seq])
+ {
+ strftime(day_time, sizeof(day_time), "%Y-%m-%d", &now);
+ snprintf(filename, sizeof(filename), "%s/%s_%d.%s", g_frag_prog_para.log_path, "fraginfo" ,thread_seq,day_time);
+ g_frag_prog_para.fraginfo_file[thread_seq] = fopen(filename, "a+");
+ localtime_r(&cur_time, &g_frag_prog_para.fraginfo_filetime[thread_seq]);
+ }
+
+ if(now.tm_mday!=g_frag_prog_para.fraginfo_filetime[thread_seq].tm_mday || now.tm_mon!=g_frag_prog_para.fraginfo_filetime[thread_seq].tm_mon || now.tm_year!=g_frag_prog_para.fraginfo_filetime[thread_seq].tm_year)
+ {
+ if(g_frag_prog_para.fraginfo_file[thread_seq])
+ {
+ strftime(day_time, sizeof(day_time), "%Y-%m-%d", &now);
+ snprintf(filename, sizeof(filename), "%s/%s_%d.%s", g_frag_prog_para.log_path, "fraginfo" ,thread_seq,day_time);
+ fclose(g_frag_prog_para.fraginfo_file[thread_seq]);
+ g_frag_prog_para.fraginfo_file[thread_seq] = fopen(filename, "a+");
+ }
+ localtime_r(&cur_time, &g_frag_prog_para.fraginfo_filetime[thread_seq]);
+ }
+
+ if(NULL!=frag->frag_opt[FRAG_URL].opt_value)
+ {
+ if(NULL!=frag->frag_opt[FRAG_S2C_CONTENT_TYPE].opt_value)
+ {
+ buflen = snprintf(buf, sizeof(buf), "%20s%10s dir:%1d PID:%20llu configID:%5d media_type:%5hhu service_id:%5u session_type:%5u addr_list:%20s url:%s s2c_content_type:%s\n",
+ now_time, g_log_des[type], a_tcp->dir, frag->tuple7_id, frag->config_id, frag->media_type, frag->service_id, frag->session_type, frag->tuple7, frag->frag_opt[FRAG_URL].opt_value,frag->frag_opt[FRAG_S2C_CONTENT_TYPE].opt_value);
+ }
+ else
+ {
+ buflen = snprintf(buf, sizeof(buf), "%20s%10s dir:%1d PID:%20llu configID:%5d media_type:%5hhu service_id:%5u session_type:%5u addr_list:%20s url:%s\n",
+ now_time, g_log_des[type], a_tcp->dir, frag->tuple7_id, frag->config_id, frag->media_type, frag->service_id, frag->session_type, frag->tuple7, frag->frag_opt[FRAG_URL].opt_value);
+ }
+ }
+ else
+ {
+ if(NULL!=frag->frag_opt[FRAG_S2C_CONTENT_TYPE].opt_value)
+ {
+ buflen = snprintf(buf, sizeof(buf), "%20s%10s dir:%1d PID:%20llu configID:%5d media_type:%5hhu service_id:%5u session_type:%5u addr_list:%20s s2c_content_type:%s\n",
+ now_time, g_log_des[type], a_tcp->dir, frag->tuple7_id, frag->config_id, frag->media_type, frag->service_id, frag->session_type, frag->tuple7, frag->frag_opt[FRAG_S2C_CONTENT_TYPE].opt_value);
+ }
+ else
+ {
+ buflen = snprintf(buf, sizeof(buf), "%20s%10s dir:%1d PID:%20llu configID:%5d media_type:%5hhu service_id:%5u session_type:%5u addr_list:%20s\n",
+ now_time, g_log_des[type], a_tcp->dir, frag->tuple7_id, frag->config_id, frag->media_type, frag->service_id, frag->session_type, frag->tuple7);
+ }
+ }
+
+ if(NULL!=g_frag_prog_para.fraginfo_file[thread_seq])
+ {
+ fwrite(buf, strlen(buf),1, g_frag_prog_para.fraginfo_file[thread_seq]);
+ fflush(g_frag_prog_para.fraginfo_file[thread_seq]);
+ }
+}
+
+void index_local_log(const char* filename, char* buf, uint32 buflen)
+{
+ FILE *pFile;
+ if ((pFile = fopen(filename, "a"))!=NULL)
+ {
+ fwrite(buf, buflen, 1, pFile);
+ fclose(pFile);
+ }
+}
+
+void frag_json_report(int local_log_type, frag_info_t* frag, http_infor* a_http,struct streaminfo *a_tcp, int thread_seq)
+{
+ AV_info_t p;
+ opt_unit_t* opt =NULL;
+ int opt_num = 0;
+
+ p.a_stream = a_tcp;
+ p.pid = frag->tuple7_id;
+ if(frag->frag_opt[FRAG_URL].opt_value!=NULL)
+ {
+ p.url = frag->frag_opt[FRAG_URL].opt_value;
+ }
+ else
+ {
+ p.url = NULL;
+ }
+ p.length = a_http->cont_length;
+ p.protocol = frag->proto;
+
+ /*referer*/
+ if(frag->frag_opt[FRAG_REFERER].opt_value!=NULL)
+ {
+ opt = (opt_unit_t*)dictator_malloc(thread_seq, sizeof(opt_unit_t));
+ opt->opt_len = frag->frag_opt[FRAG_REFERER].opt_len;
+ opt->opt_type = META_OPT_REFERER;
+ opt->opt_value = frag->frag_opt[FRAG_REFERER].opt_value;
+ opt_num ++;
+ }
+ AV_local_log_convert_json(&p,local_log_type,opt,opt_num);
+ if(opt!=NULL)
+ {
+ dictator_free(thread_seq, opt);
+ }
+}
+
+//void frgmnt_index_sendback_more(frag_info_t* frag, http_infor* a_http, struct streaminfo *a_tcp, int thread_seq)
+void frgmnt_index_sendback(frag_info_t* frag, http_infor* a_http, struct streaminfo *a_tcp, int thread_seq)
+{
+ cJSON* root;
+ char* out = NULL;
+ uint32_t out_len = 0;
+ char ipbuf[32] = {0};
+ int ipbuf_len = 32;
+ string topic_name;
+ int cb_ret = 0;
+ char pid_buf[64] = {0};
+
+ root = cJSON_CreateObject();
+
+ /*topic*/
+ if(0==frag->data_dir)
+ {
+ cJSON_AddStringToObject(root, "topic", TOPIC_MEDIA_INDEX_DATA);
+ topic_name = TOPIC_MEDIA_INDEX_DATA;
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[INDEX_TOPIC], 0, FS_OP_ADD, 1);
+ }
+ else
+ {
+ cJSON_AddStringToObject(root, "topic",TOPIC_MEDIA_CONVERGE_DATA);
+ topic_name = TOPIC_MEDIA_CONVERGE_DATA;
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[CNVG_TOPIC], 0, FS_OP_ADD, 1);
+ }
+
+ /*service_id*/
+ cJSON_AddNumberToObject(root, "service_id", frag->service_id);
+ /*capIP*/
+ inet_ntop(AF_INET, &g_frag_prog_para.local_ip_nr, ipbuf, ipbuf_len);
+ cJSON_AddStringToObject(root, "cap_ip", ipbuf);
+ /*tuple7=pid !!!!PID�����ּ���JSON�����*/
+ snprintf(pid_buf, sizeof(pid_buf), "%llu", frag->tuple7_id);
+ cJSON_AddStringToObject(root, "pid", pid_buf);
+ /*dir*/
+ cJSON_AddNumberToObject(root, "dir", a_tcp->dir);
+ /*curdir*/
+ cJSON_AddNumberToObject(root, "curdir", a_http->curdir);
+ /*url*/
+ if(NULL!=frag->frag_opt[FRAG_URL].opt_value)
+ {
+ cJSON_AddStringToObject(root, "url", frag->frag_opt[FRAG_URL].opt_value);
+ }
+ /*ua*/
+ if(NULL!=frag->frag_opt[FRAG_UA].opt_value)
+ {
+ cJSON_AddStringToObject(root, "user_agent", frag->frag_opt[FRAG_UA].opt_value);
+ }
+ /*referer*/
+ if(NULL!=frag->frag_opt[FRAG_REFERER].opt_value)
+ {
+ cJSON_AddStringToObject(root, "referer", frag->frag_opt[FRAG_REFERER].opt_value);
+ }
+ /*s2c_content*/
+ if(NULL!=frag->frag_opt[FRAG_S2C_CONTENT].opt_value)
+ {
+ frag->frag_opt[FRAG_S2C_CONTENT].opt_value = (char*)dictator_realloc(thread_seq, frag->frag_opt[FRAG_S2C_CONTENT].opt_value, frag->frag_opt[FRAG_S2C_CONTENT].opt_len+1);
+ frag->frag_opt[FRAG_S2C_CONTENT].opt_value[frag->frag_opt[FRAG_S2C_CONTENT].opt_len] = '\0';
+ frag->frag_opt[FRAG_S2C_CONTENT].opt_len += 1;
+ cJSON_AddStringToObject(root, "s2c_content", frag->frag_opt[FRAG_S2C_CONTENT].opt_value);
+ }
+ out = cJSON_Print(root);
+ out_len = strlen(out);
+ if(MESA_trace_match_addr(g_frag_prog_para.addr_trace, (trace_layer_addr*)(&a_tcp->addr)))
+ {
+ const char* addr = NULL;
+ char filename[MAX_PATH_LEN] = {0};
+ addr = printaddr(&a_tcp->addr, thread_seq);
+ snprintf(filename,sizeof(filename),"%s/%s,%u", g_frag_prog_para.trace_dir,addr, a_http->http_session_seq);
+ index_local_log(filename, out, out_len);
+ }
+ /*kfaka*/
+ if(NULL!=g_multi_kafka_producer)
+ {
+ //cb_ret = g_multi_kafka_producer->SendData(topic_name, (void *)out, (size_t)out_len);
+ cb_ret = rd_kafka_produce(g_frag_prog_para.frag_topic_rkt[MEDIA_INDEX_TOPIC_RKT], RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, out, out_len, NULL, 0, NULL);
+
+ }
+ if(out)
+ {
+ free(out);
+ }
+ cJSON_Delete(root);
+}
+
+/*
+void frgmnt_index_sendback(frag_info_t* frag, http_infor* a_http, struct streaminfo *a_tcp, int thread_seq)
+{
+ for(int i=0;i<10000;i++)
+ {
+ frgmnt_index_sendback_more(frag,a_http,a_tcp,thread_seq);
+ }
+}
+*/
+void create_media_opt(media_opt_info_t** media_optinfo, uchar opt_type, char* opt_value, uint32 opt_len, int thread_seq)
+{
+ media_opt_info_t* node = *media_optinfo;
+ if(node==NULL)
+ {
+ node = (media_opt_info_t*)dictator_malloc(thread_seq, sizeof(media_opt_info_t));
+ node->opt = NULL;
+ node->opt_content_id = -1;
+ node->opt_num = 0;
+ }
+ int opt_num = node->opt_num;
+ node->opt = (opt_unit_t*)dictator_realloc(thread_seq, (void*)node->opt, sizeof(struct opt_unit_t)*(1+opt_num));
+ memset((void*)&node->opt[opt_num],0,sizeof(struct opt_unit_t));
+ node->opt[opt_num].opt_value = (char*)dictator_malloc(thread_seq, opt_len);
+ memcpy(node->opt[opt_num].opt_value,opt_value,opt_len);
+ node->opt[opt_num].opt_type = opt_type;
+ node->opt[opt_num].opt_len = opt_len;
+ node->opt_num += 1;
+ (*media_optinfo) = node;
+}
+
+void destroy_media_opt(media_opt_info_t** media_optinfo, int thread_seq)
+{
+ if((*media_optinfo)->opt!=NULL)
+ {
+ for(int i=0;i<(*media_optinfo)->opt_num;i++)
+ {
+ if((*media_optinfo)->opt[i].opt_value!=NULL)
+ {
+ dictator_free(thread_seq, (*media_optinfo)->opt[i].opt_value);
+ (*media_optinfo)->opt[i].opt_value = NULL;
+ }
+ }
+ dictator_free(thread_seq, (*media_optinfo)->opt);
+ (*media_optinfo)->opt = NULL;
+ }
+ dictator_free(thread_seq, *media_optinfo);
+ *media_optinfo = NULL;
+}
+
+void append_media_opt(media_opt_info_t* media_optinfo, uchar opt_id, char* opt_value, uint32 opt_len, int thread_seq)
+{
+ (media_optinfo)->opt[opt_id].opt_value = (char*)dictator_realloc(thread_seq, (media_optinfo)->opt[opt_id].opt_value,
+ (media_optinfo)->opt[opt_id].opt_len+opt_len);
+ memcpy((media_optinfo)->opt[opt_id].opt_value+(media_optinfo)->opt[opt_id].opt_len,opt_value,opt_len);
+ (media_optinfo)->opt[opt_id].opt_len += opt_len;
+}
+
+int ts_data_check(char* data, uint32 data_len)
+{
+ return 0;
+}
+
+uchar frgmnt_av_sendback(frag_info_t* frag, char* data, uint32 data_len, http_infor* a_http, struct streaminfo *a_tcp, int thread_seq)
+{
+ char DST = AV_DST_NORMAL;
+ char query_url_key[1024] = {0};
+ int gen_key = 0;
+
+ char serviceid[256] = {0};
+ snprintf(serviceid,sizeof(serviceid),"%d",frag->service_id);
+
+ /*�ر�ģ�HLS .ts�����ݻỰ����һ���ֽ�Ϊ0x47*/
+ /*����Ҫ�ж�range����Ƭ�������ٷ��߳�*/
+ /*����Ҫ����lostlen�� lost֮���޷����룬ֱ�Ӳ��ش�*/
+ if(frag->service_id==SERVICE_HLS_DATA && frag->cont_offset==0)
+ {
+ if(*data!=0x47)
+ {
+ return PROT_STATE_DROPME;
+ }
+ }
+
+ /*ȷ��frag*/
+ project_req_add_char(a_tcp, g_frag_prog_para.frag_project_id, 1);
+
+ /*select DST*/
+ switch(frag->media_type)
+ {
+ case FILE_IOS:
+ case FILE_ANDRIOD:
+ case FILE_APP:
+ DST = AV_DST_APP;
+ break;
+
+ default:
+ DST = AV_DST_FRAGMENT_QUERY;
+ break;
+ }
+
+ /*media_info*/
+ if(!(frag->send_meta_flag))
+ {
+ frag->av_info = (av_info_t*)dictator_malloc(thread_seq, sizeof(av_info_t));
+ memset(frag->av_info,0,sizeof(av_info_t));
+
+ /*PID*/
+ frag->av_info->media_info.pid = frag->tuple7_id;
+ /*protocol*/
+ frag->av_info->media_info.protocol = frag->proto;
+ /*media_type*/
+ frag->av_info->media_info.media_type = frag->media_type;
+ /*media_len*/
+ if(a_http->cont_range!=NULL)
+ {
+ frag->av_info->media_info.prog_len = a_http->cont_range->len;
+ frag->cont_offset = a_http->cont_range->start;
+ }
+ else if(a_http->cont_length!=0)
+ {
+ frag->av_info->media_info.prog_len = a_http->cont_length;
+ }
+ else
+ {
+ frag->av_info->media_info.prog_len = MAX_UINT64_VAL;
+ }
+
+ /*serviceID*/
+ if(0!=frag->service_id)
+ {
+ create_media_opt(&(frag->av_info->media_optinfo),META_OPT_SERVICE_ID, serviceid, strlen(serviceid),thread_seq);
+ }
+ /*addr*/
+ create_media_opt(&(frag->av_info->media_optinfo),META_OPT_LAYER_ADDR, (char*)(&a_tcp->addr), sizeof(a_tcp->addr), thread_seq);
+ /*url*/
+ if(NULL!=frag->frag_opt[FRAG_URL].opt_value)
+ {
+ create_media_opt(&(frag->av_info->media_optinfo),META_OPT_TYPE_URL, frag->frag_opt[FRAG_URL].opt_value, frag->frag_opt[FRAG_URL].opt_len-1,thread_seq);
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[URL_META], 0, FS_OP_ADD, 1);
+ }
+ /*��������׼key*/
+ else
+ {
+ gen_key = append_http_query_key(a_tcp, a_http->http_session_seq, query_url_key, sizeof(query_url_key));
+ if(gen_key >= 0)
+ {
+ create_media_opt(&(frag->av_info->media_optinfo), META_OPT_SINGLE_KEY, query_url_key, strlen(query_url_key),thread_seq);
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[KEY_META], 0, FS_OP_ADD, 1);
+ }
+ }
+ /*ua*/
+ if(NULL!=frag->frag_opt[FRAG_UA].opt_value)
+ {
+ create_media_opt(&(frag->av_info->media_optinfo),META_OPT_USER_AGENT, frag->frag_opt[FRAG_UA].opt_value, frag->frag_opt[FRAG_UA].opt_len-1,thread_seq);
+ }
+ /*referer*/
+ if(NULL!=frag->frag_opt[FRAG_REFERER].opt_value)
+ {
+ create_media_opt(&(frag->av_info->media_optinfo),META_OPT_REFERER, frag->frag_opt[FRAG_REFERER].opt_value, frag->frag_opt[FRAG_REFERER].opt_len-1,thread_seq);
+ }
+ if(NULL!=frag->av_info->media_optinfo)
+ {
+ AV_send_media_info_to(DST, &(frag->av_info->media_info),frag->av_info->media_optinfo->opt,frag->av_info->media_optinfo->opt_num,thread_seq);
+ }
+ else
+ {
+ AV_send_media_info_to(DST, &(frag->av_info->media_info),NULL,0,thread_seq);
+ }
+ frag_write_to_log(FRAG_META_LOG, frag, a_http, a_tcp, thread_seq);
+ frag->send_meta_flag = 1;
+
+ /*�����ݲŷ�Ԫ��Ϣ*/
+ frag_write_to_log(FRAG_ZERO_LOG, frag, a_http, a_tcp,thread_seq);
+ }
+ AV_send_data_to(DST, frag->av_info->media_info.pid,
+ frag->cont_offset,
+ (const char*)data,
+ data_len,
+ frag->av_info->media_info.protocol,
+ thread_seq, (const struct layer_addr*)(&a_tcp->addr));
+ frag->cont_offset += data_len;
+ /*stat*/
+ switch(frag->media_type)
+ {
+ case FILE_OSMF:
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[OSMF_DATA_PKTS], 0, FS_OP_ADD, 1);
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[OSMF_DATA_BYTES], 0, FS_OP_ADD, data_len);
+ break;
+ case FILE_HLS:
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[HLS_DATA_PKTS], 0, FS_OP_ADD, 1);
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[HLS_DATA_BYTES], 0, FS_OP_ADD, data_len);
+ break;
+ case FILE_IOS:
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[IOS_DATA_PKTS], 0, FS_OP_ADD, 1);
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[IOS_DATA_BYTES], 0, FS_OP_ADD, data_len);
+ break;
+ case FILE_ANDRIOD:
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[ANDRIOD_DATA_PKTS], 0, FS_OP_ADD, 1);
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[ANDRIOD_DATA_BYTES], 0, FS_OP_ADD, data_len);
+ break;
+ case FILE_APP:
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[APP_DATA_PKTS], 0, FS_OP_ADD, 1);
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[APP_DATA_BYTES], 0, FS_OP_ADD, data_len);
+ break;
+ case FILE_FRAG:
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[FRAG_DATA_PKTS], 0, FS_OP_ADD, 1);
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[FRAG_DATA_BYTES], 0, FS_OP_ADD, data_len);
+ break;
+ case FILE_MAYBE_FRAG:
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[MAYBE_FRAG_DATA_PKTS], 0, FS_OP_ADD, 1);
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[MAYBE_FRAG_DATA_BYTES], 0, FS_OP_ADD, data_len);
+ break;
+ default:
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[WEIRD_DATA_PKTS], 0, FS_OP_ADD, 1);
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[WEIRD_DATA_BYTES], 0, FS_OP_ADD, data_len);
+ break;
+ }
+ return PROT_STATE_GIVEME;
+}
+
+int init_frag(frag_info_t **frag, int thread_seq)
+{
+ frag_info_t* pme = (frag_info_t*)dictator_malloc(thread_seq, sizeof(frag_info_t));
+ memset(pme, 0 ,sizeof(frag_info_t));
+ *frag = pme;
+ return 0;
+}
+
+int release_frag(frag_info_t *frag, int thread_seq)
+{
+ if(frag == NULL)
+ {
+ return -1;
+ }
+ if(frag->mid != NULL)
+ {
+ Maat_clean_status(&(frag->mid));
+ frag->mid = NULL;
+ }
+ for(int i=0;i<FRAG_OPR_NUM;i++)
+ {
+ if(NULL!=frag->frag_opt[i].opt_value)
+ {
+ dictator_free(thread_seq, frag->frag_opt[i].opt_value);
+ }
+ }
+ if(frag->av_info!= NULL)
+ {
+ if(NULL!=frag->av_info->media_optinfo)
+ {
+ destroy_media_opt(&(frag->av_info->media_optinfo), thread_seq);
+ frag->av_info->media_optinfo = NULL;
+ }
+ dictator_free(thread_seq,frag->av_info);
+ }
+ dictator_free(thread_seq, frag);
+ return 0;
+}
+
+void free_media(void* data)
+{
+ if(NULL!=data)
+ {
+ free(data);
+ }
+}
+
+int expire_media_hash_node(void *data, int eliminate_type)
+{
+ media_format_hnode_t* mdi = (media_format_hnode_t*)data;
+
+ MESA_handle_runtime_log(g_frag_prog_para.logger, RLOG_LV_INFO, FRGMNT_PLUGIN_NAME,
+ "{%s:%d} expire_media_hash_node %s: related_id: %llu, media_type:0x%02x]",
+ __FILE__,__LINE__,
+ hash_eliminate_type[eliminate_type],
+ mdi->relate_id, mdi->media_type);
+ return 1;
+}
+
+uint64 make_frag_pid(frag_info_t* frag, http_infor *a_http, struct streaminfo *a_tcp, int thread_seq)
+{
+ uint32 tcp_seq = 0;
+ uint32 tcp_seq_len = sizeof(uint32);
+ uchar proxy_type = 0;
+ int rec = 0;
+
+ /*create tuple7_id*/
+ if((a_tcp->pfather != NULL) && (a_tcp->pfather->type == STREAM_TYPE_HTTP_PROXY))
+ {
+ proxy_type = a_tcp->pfather->type;
+ }
+ rec = MESA_get_stream_opt(a_tcp, MSO_TCP_ISN_C2S, (void*)&tcp_seq, (int*)&tcp_seq_len);
+ if(rec < 0)
+ {
+ tcp_seq = 0;
+ }
+ snprintf(frag->tuple7,sizeof(frag->tuple7),"%s,%u,%u,%u",printaddr(&a_tcp->addr,thread_seq),tcp_seq,proxy_type,a_http->http_session_seq);
+ return AV_make_porg_id((unsigned char*)frag->tuple7, strlen(frag->tuple7), PID_TYPE_RESORT);
+}
+
+/*compileֵԽ�����ȼ���Խ��*/
+int select_maat_result(struct Maat_rule_t maat_result[],int result_num)
+{
+ int best_idx = 0;
+ int max_config = maat_result[0].config_id;
+ for(int i=1;i<result_num;i++)
+ {
+ if(max_config<maat_result[i].config_id)
+ {
+ best_idx = i;
+ max_config = maat_result[i].config_id;
+ }
+ }
+ return best_idx;
+}
+
+int is_frag_server_ip(struct streaminfo *a_tcp)
+{
+ int ret = -1;
+ Maat_rule_t result;
+ scan_status_t mid = NULL;
+
+ ret = Maat_scan_proto_addr(g_AV_global_feather,
+ g_frag_prog_para.frag_serverip_tableid,
+ (struct ipaddr*)(&a_tcp->addr),
+ 6,
+ &result,
+ 1,
+ &mid,
+ a_tcp->threadnum);
+
+ Maat_clean_status(&mid);
+ if(ret > 0)
+ {
+ return 1;
+ }
+ return 0;
+}
+
+uchar set_frgmnt_hit(struct Maat_rule_t maat_result[], int result_num,frag_info_t* frag, http_infor *a_http, struct streaminfo *a_tcp, int thread_seq)
+{
+ scan_status_t mid = NULL;
+
+ int idx = select_maat_result(maat_result, result_num);
+ if(NULL!=maat_result)
+ {
+ frag->config_id = maat_result[idx].config_id;
+ frag->service_id = maat_result[idx].service_id;
+ frag->proto = AV_PROTOCOL_HTTP;
+ frag->media_type = maat_result[idx].do_blacklist;
+ /*���ݷ���index ����rssb*/
+ frag->session_type = maat_result[idx].action;
+ /*0:������Ϣ 1/2:������Ϣ*/
+ frag->data_dir = maat_result[idx].do_log;
+ }
+
+ /*C2S��������IP����FRAG_SERVER_IP����Ҫ����ƬԤ�棬�����ĺ����ݵĶ�Ҫ��*/
+ /*S2C��������IP����FRAG_SERVER_IP, ��Ϊ����Ƭ*/
+ if(g_frag_prog_para.frag_forecast_switch && 0!=frag->data_dir)
+ {
+ if(is_frag_server_ip(a_tcp))
+ {
+ frag->hit_server_ip = 1;
+ }
+ if(frag->media_type==FILE_MAYBE_FRAG)
+ {
+ /*FILE_FRAG_FORECAST�ǽ�������������,��S2CӦ���ʶ��ģ����ܴ���URL*/
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[HIT_SESSION], 0, FS_OP_ADD, 1);
+ if(a_http->p_url!=NULL)
+ {
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[URLDROP_SESSION], 0, FS_OP_ADD, 1);
+ return PROT_STATE_DROPME;
+ }
+ if(frag->hit_server_ip==0)
+ {
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[DROP_SESSION], 0, FS_OP_ADD, 1);
+ return PROT_STATE_DROPME;
+ }
+ }
+ }
+
+ frag->proc_stat = FRAG_PROC_STAT_HIT;
+ frag->tuple7_id = make_frag_pid(frag, a_http, a_tcp, thread_seq);
+
+ switch(frag->media_type)
+ {
+ case FILE_OSMF:
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[OSMF_SESSION], 0, FS_OP_ADD, 1);
+ break;
+ case FILE_HLS:
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[HLS_SESSION], 0, FS_OP_ADD, 1);
+ break;
+ case FILE_IOS:
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[IOS_SESSION], 0, FS_OP_ADD, 1);
+ break;
+ case FILE_ANDRIOD:
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[ANDRIOD_SESSION], 0, FS_OP_ADD, 1);
+ break;
+ case FILE_APP:
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[APP_SESSION], 0, FS_OP_ADD, 1);
+ break;
+ case FILE_FRAG:
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[FRAG_SESSION], 0, FS_OP_ADD, 1);
+ break;
+ case FILE_MAYBE_FRAG:
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[MAYBE_FRAG_SESSION], 0, FS_OP_ADD, 1);
+ break;
+ default:
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[WEIRD_SESSION], 0, FS_OP_ADD, 1);
+ break;
+ }
+ if(frag->session_type==SESSION_FRGMNT_INDEX)
+ {
+ switch(frag->media_type)
+ {
+ case FILE_HLS:
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[HLS_INDEX_SESSION], 0, FS_OP_ADD, 1);
+ if(a_http->cont_encoding==HTTP_CONT_ENCOD_GZIP)
+ {
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[INDEX_GZIP], 0, FS_OP_ADD, 1);
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ else
+ {
+ switch(frag->media_type)
+ {
+ case FILE_OSMF:
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[OSMF_DATA_SESSION], 0, FS_OP_ADD, 1);
+ break;
+ case FILE_HLS:
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[HLS_DATA_SESSION], 0, FS_OP_ADD, 1);
+ break;
+ case FILE_IOS:
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[IOS_DATA_SESSION], 0, FS_OP_ADD, 1);
+ break;
+ case FILE_ANDRIOD:
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[ANDRIOD_DATA_SESSION], 0, FS_OP_ADD, 1);
+ break;
+ case FILE_APP:
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[APP_DATA_SESSION], 0, FS_OP_ADD, 1);
+ break;
+ case FILE_FRAG:
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[FRAG_DATA_SESSION], 0, FS_OP_ADD, 1);
+ break;
+ case FILE_MAYBE_FRAG:
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[MAYBE_FRAG_DATA_SESSION], 0, FS_OP_ADD, 1);
+ break;
+ default:
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[WEIRD_DATA_SESSION], 0, FS_OP_ADD, 1);
+ break;
+ }
+ }
+}
+
+int set_osmf_hit(frag_info_t *frag, http_infor *a_http, struct streaminfo *a_tcp, int thread_seq)
+{
+ frag->config_id = 0;
+ frag->service_id = SERVICE_OSMF_DATA;
+ frag->proto = AV_PROTOCOL_HTTP;
+ frag->media_type = FILE_OSMF;
+ frag->session_type = SESSION_FRGMNT_DATA;
+ frag->data_dir = FRAG_TYPE_DATA_S2C;
+
+ frag->tuple7_id = make_frag_pid(frag, a_http, a_tcp, thread_seq);
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[OSMF_SESSION], 0, FS_OP_ADD, 1);
+ return 1;
+}
+
+/*
+int set_hls_hit(frag_info_t *frag, http_infor *a_http, struct streaminfo *a_tcp, int thread_seq)
+{
+ frag->config_id = 0;
+ frag->service_id = SERVICE_HLS_DATA;
+ frag->proto = AV_PROTOCOL_HTTP;
+ frag->media_type = FILE_HLS;
+ frag->session_type = SESSION_FRGMNT_DATA;
+ frag->data_dir = FRAG_TYPE_DATA_S2C;
+
+ frag->tuple7_id = make_frag_pid(frag, a_http, a_tcp, thread_seq);
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[OSMF_SESSION], 0, FS_OP_ADD, 1);
+ return 1;
+}
+*/
+
+int set_app_hit(frag_info_t *frag, http_infor *a_http, struct streaminfo *a_tcp, int thread_seq)
+{
+ frag->config_id = 0;
+ frag->service_id = SERVICE_APP;
+ frag->proto = AV_PROTOCOL_HTTP;
+ frag->media_type = FILE_APP;
+ frag->session_type = SESSION_FRGMNT_UNKNOW;
+ frag->data_dir = FRAG_TYPE_DATA_S2C;
+
+ frag->tuple7_id = make_frag_pid(frag, a_http, a_tcp, thread_seq);
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[APP_SESSION], 0, FS_OP_ADD, 1);
+ return 1;
+}
+
+static int url_indicated_app(const char *url)
+{
+ const char *pfilename = NULL, *p1 = NULL, *p2 = NULL, *p3 = NULL, *p = NULL;
+ unsigned int len=0;
+
+ if(url == NULL)
+ return 0;
+
+ pfilename = strrchr(url, '/');
+ if(pfilename == NULL || *(pfilename + 1) == '\0')
+ return 0;
+
+ pfilename = pfilename + 1;
+ len = strlen(pfilename);
+
+ p1 = strchr(pfilename, ';');
+ p2 = strchr(pfilename, '?');
+ p3 = strchr(pfilename, '#');
+
+ if(p1>p2)
+ {
+ p = p1; p1 = p2; p2 = p;
+ }
+ if(p1>p3)
+ {
+ p = p3; p1 = p3; p3 = p;
+ }
+ if(p2>p3)
+ {
+ p = p2; p2 = p3; p3 = p;
+ }
+ if(p1 != NULL)
+ len = p1 - pfilename;
+ else if(p2 != NULL)
+ len = p2 - pfilename;
+ else if(p3 != NULL)
+ len = p3 - pfilename;
+
+ if(!strncasecmp(pfilename + len - 4, ".apk", 4))
+ return FILE_ANDRIOD;
+ if(!strncasecmp(pfilename + len - 4, ".ipa", 4))
+ return FILE_IOS;
+ return 0;
+}
+
+void create_related_id(frag_info_t *frag, http_infor* a_http, struct streaminfo *a_tcp)
+{
+ if(frag->related_id!=0) return;
+ uint64 cont_len = a_http->cont_length;
+
+ /*��¼�ļ����ȣ���������related_id, ֻ��������S2C������������content_range��lenΪ�ļ��ܳ���*/
+ if(a_http->cont_range!=NULL)
+ {
+ cont_len = a_http->cont_range->len;
+ }
+
+ if(cont_len==0) return;
+
+ struct layer_addr* addr = &a_tcp->addr;
+ uchar pid_buf[PID_BUF_LEN] = {0};
+ if(addr->addrtype == ADDR_TYPE_IPV4)
+ {
+ struct tuple4* tuple4 = (struct tuple4 *)(addr->paddr);
+ char ip4saddr[IPV4_ADDR_LEN] = {0};
+ char ip4daddr[IPV4_ADDR_LEN] = {0};
+ inet_ntop(AF_INET, &tuple4->saddr, ip4saddr, IPV4_ADDR_LEN);
+ inet_ntop(AF_INET, &tuple4->daddr, ip4daddr, IPV4_ADDR_LEN);
+ snprintf((char*)pid_buf, PID_BUF_LEN, "IPV4%s,%s,%llu", ip4saddr, ip4daddr, cont_len);
+ }
+ else if(addr->addrtype == ADDR_TYPE_IPV6)
+ {
+ struct tuple6* tuple6 = (struct tuple6 *)(addr->paddr);
+ char ip6saddr[IPV6_ADDR_LEN] = {0};
+ char ip6daddr[IPV6_ADDR_LEN] = {0};
+ inet_ntop(AF_INET6, &tuple6->saddr, ip6saddr, IPV6_ADDR_LEN);
+ inet_ntop(AF_INET6, &tuple6->daddr, ip6daddr, IPV6_ADDR_LEN);
+ snprintf((char*)pid_buf, PID_BUF_LEN, "IPV6%s,%s,%llu", ip6saddr, ip6daddr, cont_len);
+ }
+ else
+ {
+ return;
+ }
+ frag->related_id = AV_make_porg_id(pid_buf, (unsigned int)strlen((const char*)pid_buf), PID_TYPE_RESORT);
+}
+
+long media_hash_search_cb(void *data, const uint8_t *key, uint size, void *user_arg)
+{
+ frag_info_t* frag = (frag_info_t*)user_arg;
+ if(NULL!=data)
+ {
+ frag->media_type = ((media_format_hnode_t*)data)->media_type;
+ return 1;
+ }
+ return 0;
+}
+
+int frag_multithread_hash_search(const char *pHeader, int iHeaderLen, frag_info_t* frag, http_infor *a_http, struct streaminfo *a_tcp, int thread_seq)
+{
+ if(!g_frag_prog_para.app_switch && !g_frag_prog_para.osmf_data_feature_switch) return 0;
+
+ long rec_cb = 0;
+ char* find_pos = NULL;
+ media_format_hnode_t* media_format_hnode = NULL;
+
+ create_related_id(frag, a_http, a_tcp);
+ if(0!=frag->related_id)
+ {
+ /*���߳���������£���ѯmedia_hash����ʶ��*/
+ MESA_htable_search_cb(g_frag_prog_para.media_hash,
+ (const uint8_t *)&frag->related_id,
+ sizeof(frag->related_id),
+ media_hash_search_cb,
+ (void*)frag,
+ &rec_cb);
+ if(rec_cb==0)
+ {
+ /*����������ƥ��*/
+ if(g_frag_prog_para.app_switch && g_frag_prog_para.app_gzip_switch)
+ {
+ find_pos = (char*)memmem(pHeader, iHeaderLen, g_app_feature, sizeof(g_app_feature));
+ if(NULL!=find_pos)
+ {
+ frag->media_type = FILE_APP;
+ }
+ }
+ if(g_frag_prog_para.osmf_data_feature_switch)
+ {
+ find_pos = (char*)memmem(pHeader, iHeaderLen, g_osmf_feature, sizeof(g_osmf_feature));
+ if(NULL!=find_pos)
+ {
+ frag->media_type = FILE_OSMF;
+ }
+ }
+ if(NULL!=find_pos)
+ {
+ /*add prog_id , ��ͨ���߳��ļ�����*/
+ media_format_hnode = (media_format_hnode_t*)malloc(sizeof(media_format_hnode_t));
+ media_format_hnode->media_type = frag->media_type;
+ media_format_hnode->relate_id = frag->related_id;
+ MESA_htable_add(g_frag_prog_para.media_hash, (const unsigned char*)&frag->related_id, sizeof(frag->related_id), (const void*)media_format_hnode);
+ /*write log*/
+ MESA_handle_runtime_log(g_frag_prog_para.logger,RLOG_LV_INFO,FRGMNT_PLUGIN_NAME,
+ (char*)"[%s:%d] media_format_indentify MESA_htable_add related_id:%llu, media_type:0x%02x, url:%s." ,
+ __FILE__,__LINE__, frag->related_id, frag->media_type, a_http->p_url);
+ }
+ }
+
+ /*����������*/
+ if(rec_cb || NULL!=find_pos)
+ {
+ switch(frag->media_type)
+ {
+ case FILE_OSMF:
+ set_osmf_hit(frag, a_http, a_tcp, thread_seq);
+ return 1;
+ break;
+
+ case FILE_APP:
+ set_app_hit(frag, a_http, a_tcp, thread_seq);
+ frag->session_type = SESSION_FRGMNT_DATA;
+ return 1;
+ break;
+
+ default:
+ break;
+ }
+ }
+ }
+ return 0;
+}
+
+char frag_scan(stSessionInfo* session_info, frag_info_t* frag, http_infor *a_http, struct streaminfo *a_tcp, int thread_seq)
+{
+ int ret = 0;
+ uchar return_value = PROT_STATE_GIVEME;
+ const char* scan_data = NULL;
+ uint32 scan_datalen = 0;
+ const char* region_name = {0};
+ uint32 region_name_len = 0;
+ int found_pos[MAAT_RESULT_NUM] = {0};
+ int hit_result = 0;
+ struct Maat_rule_t maat_result[MAAT_RESULT_NUM];
+ char url[2048] = {0};
+
+ memset(&maat_result, 0, sizeof(maat_result));
+ /*scan preproc*/
+ switch(session_info->prot_flag)
+ {
+ case HTTP_CONTENT:
+ /*����ֻɨ���װ�*/
+ if((session_info->prot_flag==HTTP_CONTENT||session_info->prot_flag==HTTP_UNGZIP_CONTENT)
+ && a_http->curdir==DIR_S2C && frag->s2c_first_pkt==0)
+ {
+ frag->s2c_first_pkt = 1;
+ scan_data = (char*)session_info->buf;
+ scan_datalen = session_info->buflen;
+ }
+ break;
+
+ case HTTP_CONT_LENGTH:
+ case HTTP_CONT_RANGE:
+ case HTTP_USER_AGENT:
+ case HTTP_REFERER:
+ case HTTP_COOKIE:
+ case HTTP_HOST:
+ /*����Ҫɨ��*/
+ break;
+ case HTTP_CONT_TYPE:
+ if(a_http->curdir==DIR_S2C)
+ {
+ scan_data = (char*)session_info->buf;
+ scan_datalen = session_info->buflen;
+ }
+ break;
+
+ default:
+ if(g_frag_prog_para.app_switch && session_info->prot_flag==HTTP_MESSAGE_URL)
+ {
+ memcpy(url, session_info->buf, MIN((int)session_info->buflen, (int)(sizeof(url)-1)));
+ frag->media_type = url_indicated_app(url);
+ if(frag->media_type)
+ {
+ hit_result = set_app_hit(frag,a_http,a_tcp,thread_seq);
+ }
+ }
+ if(!frag->media_type)
+ {
+ scan_data = (char*)session_info->buf;
+ scan_datalen = session_info->buflen;
+ }
+ break;
+ }
+
+ /*scan*/
+ if(NULL!=scan_data)
+ {
+ /* ����ʽɨ��*/
+ region_name = http_proto_flag2region(session_info->prot_flag);
+ region_name_len = strlen(region_name);
+ ret=Maat_set_scan_status(g_frag_prog_para.feather, &frag->mid, MAAT_SET_SCAN_DISTRICT,region_name,strlen(region_name));
+ if(ret<0)
+ {
+ return PROT_STATE_DROPME;
+ }
+ hit_result = Maat_full_scan_string( g_frag_prog_para.feather, g_frag_prog_para.expr_tableid, CHARSET_GBK,
+ scan_data, scan_datalen,
+ maat_result, found_pos, MAAT_RESULT_NUM, &(frag->mid),thread_seq);
+ /*IP��ַɨ��*/
+ if(g_frag_prog_para.ip_switch && hit_result<=0 && frag->s2c_first_pkt)
+ {
+ /*S2C�װ��޷����У�ɨ��IP��ַ*/
+ hit_result = Maat_scan_proto_addr(g_frag_prog_para.feather,g_frag_prog_para.ip_tableid,
+ (struct ipaddr*)&a_tcp->addr, TCL_PROTOCOL,
+ maat_result,MAAT_RESULT_NUM,&(frag->mid),thread_seq);
+ }
+
+ /* ���߳������HASH����*/
+ if(hit_result<=0 && frag->s2c_first_pkt)
+ {
+ hit_result = frag_multithread_hash_search(scan_data,scan_datalen,frag,a_http,a_tcp,thread_seq);
+ /*����ɨ�裬DROP�Ự*/
+ if(hit_result<=0)
+ {
+ //if(scan_data!=NULL && *scan_data==0x47)
+ //{
+ // hit_result = set_hls_hit(frag, a_http, a_tcp, thread_seq);
+ //}
+ //else
+ //{
+ return PROT_STATE_DROPME;
+ //}
+ }
+ }
+ }
+
+ if(hit_result>0)
+ {
+ return_value = set_frgmnt_hit(maat_result,hit_result,frag,a_http,a_tcp,thread_seq);
+ }
+ return return_value;
+}
+
+char frag_save(stSessionInfo* session_info, frag_info_t* frag, http_infor *a_http, struct streaminfo *a_tcp, int thread_seq)
+{
+ int opt_type = -1;
+
+ switch(session_info->prot_flag)
+ {
+ case HTTP_MESSAGE_URL:
+ opt_type = FRAG_URL;
+ break;
+ case HTTP_HOST:
+ opt_type = FRAG_HOST;
+ break;
+ case HTTP_USER_AGENT:
+ opt_type = FRAG_UA;
+ break;
+ case HTTP_REFERER:
+ opt_type = FRAG_REFERER;
+ break;
+ case HTTP_CONT_TYPE:
+ if(a_http->curdir==DIR_S2C)
+ {
+ opt_type = FRAG_S2C_CONTENT_TYPE;
+ }
+ break;
+ default:
+ break;
+ }
+ if(-1!=opt_type && frag->frag_opt[opt_type].opt_value==NULL)
+ {
+ frag->frag_opt[opt_type].opt_value = (char*)dictator_malloc(thread_seq, session_info->buflen+1);
+ memcpy(frag->frag_opt[opt_type].opt_value, session_info->buf, session_info->buflen);
+ frag->frag_opt[opt_type].opt_value[session_info->buflen] = '\0';
+ frag->frag_opt[opt_type].opt_len = session_info->buflen+1;
+ }
+
+ return 0;
+}
+
+uchar frag_sendback(stSessionInfo* session_info, frag_info_t* frag, http_infor *a_http, struct streaminfo *a_tcp, int thread_seq)
+{
+ uchar return_stat = PROT_STATE_GIVEME;
+
+ /*����������Ϣ����ȷ��Ŀ�ĵأ�ͨ��a_tcp->dir�ж��Ƿ��ǵ������پ���Ŀ�ĵ�*/
+ if(frag->session_type==SESSION_FRGMNT_UNKNOW)
+ {
+ if(a_tcp->dir == DIR_DOUBLE)
+ {
+ frag->session_type = SESSION_FRGMNT_DATA;
+ }
+ else
+ {
+ frag->session_type = SESSION_FRGMNT_INDEX;
+ }
+ }
+
+ /*change because of a_tcp->dir is not accurate*/
+ if(frag->session_type == SESSION_FRGMNT_INDEX)
+ {
+ if(a_tcp->dir==DIR_DOUBLE && frag->data_dir!=0)
+ {
+ frag->session_type = SESSION_FRGMNT_DATA;
+ }
+ }
+
+
+ /*index need cache;*/
+ if(frag->session_type == SESSION_FRGMNT_INDEX)
+ {
+ if(DIR_S2C==a_http->curdir && (session_info->prot_flag==HTTP_CONTENT || session_info->prot_flag==HTTP_UNGZIP_CONTENT))
+ {
+ if(frag->frag_opt[FRAG_S2C_CONTENT].opt_len<g_frag_prog_para.s2c_cache_size)
+ {
+ frag->frag_opt[FRAG_S2C_CONTENT].opt_value = (char*)dictator_realloc(thread_seq, frag->frag_opt[FRAG_S2C_CONTENT].opt_value, frag->frag_opt[FRAG_S2C_CONTENT].opt_len+session_info->buflen);
+ memcpy(frag->frag_opt[FRAG_S2C_CONTENT].opt_value+frag->frag_opt[FRAG_S2C_CONTENT].opt_len, session_info->buf, session_info->buflen);
+ frag->frag_opt[FRAG_S2C_CONTENT].opt_len += session_info->buflen;
+ }
+ else
+ {
+ frag->proc_stat = FRAG_PROC_STAT_SCAN;
+ return PROT_STATE_DROPME;
+ }
+ }
+ }
+
+ /*send ȷ������CPZ*/
+ if(frag->session_type==SESSION_FRGMNT_DATA)
+ {
+ if(a_http->res_code==200 || a_http->res_code==206 || a_http->res_code==0)
+ {
+ if(session_info->prot_flag==HTTP_CONTENT || session_info->prot_flag==HTTP_UNGZIP_CONTENT)
+ {
+ return_stat = frgmnt_av_sendback(frag,(char*)session_info->buf,session_info->buflen,a_http,a_tcp,thread_seq);
+ }
+ }
+ else
+ {
+ frag->proc_stat = FRAG_PROC_STAT_SCAN;
+ return_stat = PROT_STATE_DROPME;
+ }
+ }
+ return return_stat;
+}
+
+uchar FRAG_MONITOR_ENTRY(stSessionInfo* session_info, void **param,int thread_seq, struct streaminfo *a_tcp, void *a_packet)
+{
+ if(NULL==session_info)
+ {
+ return PROT_STATE_DROPME;
+ }
+ uchar return_stat = PROT_STATE_GIVEME;
+ http_infor* a_http = (http_infor *)(session_info->app_info);
+ frag_info_t* frag = (frag_info_t*)*param;
+
+ /*other HTTP, do not proc*/
+ if(a_http->method!=HTTP_METHOD_UNKNOWN&&a_http->method!=HTTP_METHOD_GET)
+ {
+ return PROT_STATE_DROPME;
+ }
+
+ /*init*/
+ if(frag==NULL)
+ {
+ if(init_frag(&frag, thread_seq) <0)
+ {
+ return PROT_STATE_DROPME;
+ }
+ *param = frag;
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[HTTP_SESSION], 0, FS_OP_ADD, 1);
+ }
+
+ if(session_info->prot_flag!=HTTP_STATE && frag->proc_stat==FRAG_PROC_STAT_SCAN)
+ {
+ return_stat = frag_scan(session_info,frag,a_http,a_tcp,thread_seq);
+ if(PROT_STATE_DROPME==return_stat) goto DROP_SESSION;
+ if(frag->proc_stat==FRAG_PROC_STAT_HIT)
+ {
+ frag->proc_stat = FRAG_PROC_STAT_SEND;
+ }
+ }
+
+ /*��ƬԪ��Ϣ�洢*/
+ frag_save(session_info,frag,a_http,a_tcp,thread_seq);
+
+ /*�ȴ�refer,��FD��־*/
+ if(frag->proc_stat==FRAG_PROC_STAT_SEND)
+ {
+ /*��Ƭ���ܿ�*/
+ if(NULL!=a_http->p_url
+ && DIR_C2S==a_http->curdir
+ && session_info->prot_flag==HTTP_STATE
+ && HTTP_DATA_BEGIN==a_http->http_state)
+ {
+ return_stat = frag_check_block(frag, a_http, a_tcp, a_packet, thread_seq);
+ if(PROT_STATE_DROPME==return_stat) goto DROP_SESSION;
+ }
+ }
+
+ if(session_info->prot_flag!=HTTP_STATE && frag->proc_stat==FRAG_PROC_STAT_SEND)
+ {
+ /*���ݻش�*/
+ return_stat = frag_sendback(session_info,frag,a_http,a_tcp,thread_seq);
+ if(PROT_STATE_DROPME==return_stat) goto DROP_SESSION;
+ }
+
+DROP_SESSION:
+ /*release*/
+ if(return_stat==PROT_STATE_DROPME||session_info->session_state&SESSION_STATE_CLOSE)
+ {
+ /*ȷ��frag*/
+ project_req_add_char(a_tcp, g_frag_prog_para.frag_project_id, 0);
+
+ if(frag->proc_stat==FRAG_PROC_STAT_SEND)
+ {
+ if(frag->session_type==SESSION_FRGMNT_INDEX)
+ {
+ frgmnt_index_sendback(frag,a_http,a_tcp,thread_seq);
+ frag_write_to_log(FRAG_INDEX_LOG, frag, a_http, a_tcp, thread_seq);
+ }
+ /*��������״̬������kafka*/
+ if(g_frag_prog_para.frag_report_switch)
+ {
+ frag_json_report(LOCAL_LOG_TYPE_FRAG_FIND_PROG,frag, a_http, a_tcp, thread_seq);
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[FRAG_PROG_TOPIC], 0, FS_OP_ADD, 1);
+ }
+ /*��ƬԤ�淢��kafka,��ʱֻ����Ƭ��*/
+ if(g_frag_prog_para.frag_forecast_switch && 0!=frag->data_dir && a_http->p_url!=NULL && frag->hit_server_ip==0)
+ {
+ frag_json_report(LOCAL_LOG_TYPE_FRAG_SERVER_IP,frag, a_http, a_tcp, thread_seq);
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[FRAG_URL_TOPIC], 0, FS_OP_ADD, 1);
+ }
+ }
+ release_frag((frag_info_t*)*param, thread_seq);
+ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[HTTP_SESSION_CLOSE], 0, FS_OP_ADD, 1);
+ *param = NULL;
+ }
+ return return_stat;
+}
+
+int read_main_conf()
+{
+ char log_filename[MAX_PATH_LEN]={0};
+ char table_info_filename [MAX_PATH_LEN]={0};
+ char config_buff[MAX_PATH_LEN]={0};
+ int log_level = 0;
+
+ g_frag_prog_para.avconf_handle = wired_cfg_create("AV_CONF", "./avconf/av.conf");
+ wired_cfg_init(g_frag_prog_para.avconf_handle);
+
+ g_frag_prog_para.avcomconf_handle = wired_cfg_create("AV_COM", "./avconf/av_com.conf");
+ wired_cfg_init(g_frag_prog_para.avcomconf_handle);
+
+ //LOG
+ wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","LogSwitch",config_buff,sizeof(config_buff),"0");
+ g_frag_prog_para.log_switch = (short)atoi(config_buff);
+ wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","LogLevel",config_buff,sizeof(config_buff),"30");
+ log_level = (short)atoi(config_buff);
+ wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","LogPath",g_frag_prog_para.log_path,sizeof(config_buff),"./avlog/frag_monitor/");
+ snprintf(log_filename, sizeof(log_filename), "%s/%s", g_frag_prog_para.log_path, "runtime.log");
+ g_frag_prog_para.logger = MESA_create_runtime_log_handle(log_filename,log_level);
+ if(NULL==g_frag_prog_para.logger)
+ {
+ printf("MESA_create_runtime_log_handle error.\n");
+ return -1;
+ }
+
+ /*maat*/
+#if K_PROJECT
+ g_frag_prog_para.frag_url_table_id = Maat_table_register(g_AV_global_feather, "MM_GLOBAL_FRAG_INDEX_URL");
+#else
+ g_frag_prog_para.frag_url_table_id = Maat_table_register(g_AV_global_feather, "GLOBAL_FRAG_INDEX_URL");
+#endif
+ int value = 0;
+ g_frag_prog_para.fs_handle = FS_create_handle();
+ wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","StatFile",config_buff,sizeof(config_buff),"./avlog/frag_monitor/frag_monitor_status.log");
+ FS_set_para(g_frag_prog_para.fs_handle, OUTPUT_DEVICE, config_buff, strlen(config_buff)+1);
+ value = 1;//flush by date
+ FS_set_para(g_frag_prog_para.fs_handle, FLUSH_BY_DATE, &value, sizeof(value));
+ value = 2;//append
+ FS_set_para(g_frag_prog_para.fs_handle, PRINT_MODE, &value, sizeof(value));
+ wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","StatInterval",config_buff,sizeof(config_buff),"30");
+ value = atoi(config_buff);
+ FS_set_para(g_frag_prog_para.fs_handle, STAT_CYCLE, &value, sizeof(value));
+ wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","StatSwitch",config_buff,sizeof(config_buff),"1");
+ value = atoi(config_buff);
+ FS_set_para(g_frag_prog_para.fs_handle, PRINT_TRIGGER, &value, sizeof(value));
+ if(value)
+ {
+ /*���ܿؼ��һ��*/
+ wired_cfg_read(g_frag_prog_para.avcomconf_handle,"COMMON","fs2_ip",config_buff,sizeof(config_buff),"");
+ FS_set_para(g_frag_prog_para.fs_handle, STATS_SERVER_IP, config_buff, strlen(config_buff)+1);
+ wired_cfg_read(g_frag_prog_para.avcomconf_handle,"COMMON","fs2_port",config_buff,sizeof(config_buff),"8125");
+ unsigned short port = atoi(config_buff);
+ FS_set_para(g_frag_prog_para.fs_handle, STATS_SERVER_PORT, &port, sizeof(port));
+ }
+ wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","FRAG_NAME",config_buff,sizeof(config_buff),"frag_monitor");
+ FS_set_para(g_frag_prog_para.fs_handle, APP_NAME, config_buff, strlen(config_buff)+1);
+ value = 1;
+ FS_set_para(g_frag_prog_para.fs_handle, CREATE_THREAD, &value, sizeof(value));
+ g_frag_prog_para.fs_id[HTTP_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "HTTP_links");
+ g_frag_prog_para.fs_id[HLS_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "HLS_links");
+ g_frag_prog_para.fs_id[OSMF_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "OSMF_links");
+ g_frag_prog_para.fs_id[IOS_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "IPA_links");
+ g_frag_prog_para.fs_id[ANDRIOD_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "APK_links");
+ g_frag_prog_para.fs_id[APP_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "APP_links");
+ g_frag_prog_para.fs_id[HIT_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "HIT_links");
+ g_frag_prog_para.fs_id[URLDROP_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "URLDROP_links");
+ g_frag_prog_para.fs_id[DROP_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "DROP_links");
+ g_frag_prog_para.fs_id[FRAG_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "FRAG_links");
+ g_frag_prog_para.fs_id[MAYBE_FRAG_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "MATBE_links");
+ g_frag_prog_para.fs_id[WEIRD_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "WEIRD_links");
+ g_frag_prog_para.fs_id[HLS_INDEX_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "HLS_index");
+ g_frag_prog_para.fs_id[HLS_DATA_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "HLS_data");
+ g_frag_prog_para.fs_id[OSMF_DATA_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "OSMF_data");
+ g_frag_prog_para.fs_id[IOS_DATA_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "IPA_data");
+ g_frag_prog_para.fs_id[ANDRIOD_DATA_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "APK_data");
+ g_frag_prog_para.fs_id[APP_DATA_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "APP_data");
+ g_frag_prog_para.fs_id[FRAG_DATA_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "FRAG_data");
+ g_frag_prog_para.fs_id[MAYBE_FRAG_DATA_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "MATBE_data");
+ g_frag_prog_para.fs_id[WEIRD_DATA_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "WEIRD_data");
+ g_frag_prog_para.fs_id[HLS_DATA_PKTS] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "HLS_d_pkts");
+ g_frag_prog_para.fs_id[HLS_DATA_BYTES] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "HLS_d_bytes");
+ g_frag_prog_para.fs_id[OSMF_DATA_PKTS] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "OSMF_d_pkts");
+ g_frag_prog_para.fs_id[OSMF_DATA_BYTES] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "OSMF_d_bytes");
+ g_frag_prog_para.fs_id[IOS_DATA_PKTS] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "IPA_d_pkts");
+ g_frag_prog_para.fs_id[IOS_DATA_BYTES] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "IPA_d_bytes");
+ g_frag_prog_para.fs_id[ANDRIOD_DATA_PKTS] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "APK_d_pkts");
+ g_frag_prog_para.fs_id[ANDRIOD_DATA_BYTES] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "APK_d_bytes");
+ g_frag_prog_para.fs_id[APP_DATA_PKTS] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "APP_d_pkts");
+ g_frag_prog_para.fs_id[APP_DATA_BYTES] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "APP_d_bytes");
+ g_frag_prog_para.fs_id[FRAG_DATA_PKTS] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "FRAG_d_pkts");
+ g_frag_prog_para.fs_id[FRAG_DATA_BYTES] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "FRAG_d_bytes");
+ g_frag_prog_para.fs_id[MAYBE_FRAG_DATA_PKTS] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "MAYBE_d_pkts");
+ g_frag_prog_para.fs_id[MAYBE_FRAG_DATA_BYTES] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "MAYBE_d_bytes");
+ g_frag_prog_para.fs_id[WEIRD_DATA_PKTS] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "WEIRD_d_pkts");
+ g_frag_prog_para.fs_id[WEIRD_DATA_BYTES] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "WEIRD_d_bytes");
+ g_frag_prog_para.fs_id[INDEX_TOPIC] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "index_topic");
+ g_frag_prog_para.fs_id[CNVG_TOPIC] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "cnvg_topic");
+ g_frag_prog_para.fs_id[FRAG_PROG_TOPIC] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "prog_topic");
+ g_frag_prog_para.fs_id[FRAG_URL_TOPIC] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "url_topic");
+ g_frag_prog_para.fs_id[URL_META] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "url_meta");
+ g_frag_prog_para.fs_id[KEY_META] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "key_meta");
+ g_frag_prog_para.fs_id[HTTP_SESSION_CLOSE] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "HTTP_close");
+ g_frag_prog_para.fs_id[INDEX_GZIP] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "index_gzip");
+ FS_start(g_frag_prog_para.fs_handle);
+
+ /*media_hash*/
+ uint32_t media_hash_size = 0;
+ uint32_t media_hash_max_elem_num = 0;
+ uint32_t media_hash_expire_time = 0;
+ wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","MediaHashSize",config_buff,sizeof(config_buff),"65536");
+ media_hash_size = atoi(config_buff);
+ wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","MediaHashElemNum",config_buff,sizeof(config_buff),"1048576");
+ media_hash_max_elem_num = atoi(config_buff);
+ wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","MediaHashExpireTime",config_buff,sizeof(config_buff),"120");
+ media_hash_expire_time = atoi(config_buff);
+ MESA_htable_create_args_t hash_args;
+ memset(&hash_args,0,sizeof(MESA_htable_create_args_t));
+ hash_args.thread_safe = 1; //group lock
+ hash_args.recursive = 1;
+ hash_args.hash_slot_size = media_hash_size;
+ hash_args.max_elem_num = media_hash_max_elem_num;
+ hash_args.eliminate_type = HASH_ELIMINATE_ALGO_LRU;
+ hash_args.expire_time = media_hash_expire_time;
+ hash_args.key_comp = NULL;
+ hash_args.key2index = NULL;
+ hash_args.data_free = free_media;
+ hash_args.data_expire_with_condition = expire_media_hash_node;
+ g_frag_prog_para.media_hash = MESA_htable_create(&hash_args, sizeof(hash_args));
+ if(NULL==g_frag_prog_para.media_hash)
+ {
+ printf("create media_hash error.\n");
+ return -1;
+ }
+
+ //SYSTEM
+ /*app switch*/
+ wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","AppSwitch",config_buff,sizeof(config_buff),"0");
+ g_frag_prog_para.app_switch = (short)atoi(config_buff);
+ wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","AppGzipFeatureSwitch",config_buff,sizeof(config_buff),"0");
+ g_frag_prog_para.app_gzip_switch = (short)atoi(config_buff);
+ wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","OsmfDataFeatureSwitch",config_buff,sizeof(config_buff),"0");
+ g_frag_prog_para.osmf_data_feature_switch = (short)atoi(config_buff);
+ wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","IPSwitch",config_buff,sizeof(config_buff),"0");
+ g_frag_prog_para.ip_switch = (short)atoi(config_buff);
+ wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","S2cCacheSize",config_buff,sizeof(config_buff),"500000");
+ g_frag_prog_para.s2c_cache_size = (uint32)atoi(config_buff);
+ wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","FragReportSwitch",config_buff,sizeof(config_buff),"0");
+ g_frag_prog_para.frag_report_switch = (uint32)atoi(config_buff);
+ wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","FragForecastSwitch",config_buff,sizeof(config_buff),"0");
+ g_frag_prog_para.frag_forecast_switch = (uint32)atoi(config_buff);
+
+ /*maat*/
+ wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","MaatTableInfo",table_info_filename,sizeof(table_info_filename),"./avconf/frag_monitor/table_info.conf");
+ wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","FragMonitorJSON",config_buff,sizeof(config_buff),"./avconf/frag_monitor/frag_monitor.json");
+ int scan_detail = 0;
+ g_frag_prog_para.feather = Maat_feather(g_iThreadNum, table_info_filename, g_frag_prog_para.logger);
+ Maat_set_feather_opt(g_frag_prog_para.feather, MAAT_OPT_JSON_FILE_PATH, config_buff, strlen(config_buff)+1);
+ wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","MaatStatFile",config_buff,sizeof(config_buff),"./avlog/frag_monitor/maat_stat.log");
+ Maat_set_feather_opt(g_frag_prog_para.feather, MAAT_OPT_STAT_FILE_PATH, config_buff, strlen(config_buff)+1);
+ int temp = 0;
+ wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","MaatStatSwitch",config_buff,sizeof(config_buff),"0");
+ temp = atoi(config_buff);
+ if(temp)
+ {
+ Maat_set_feather_opt(g_frag_prog_para.feather, MAAT_OPT_STAT_ON, NULL, 0);
+ }
+ wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","MaatPerfSwitch",config_buff,sizeof(config_buff),"0");
+ temp = atoi(config_buff);
+ if(temp)
+ {
+ Maat_set_feather_opt(g_frag_prog_para.feather, MAAT_OPT_PERF_ON, NULL, 0);
+ }
+ Maat_set_feather_opt(g_frag_prog_para.feather, MAAT_OPT_SCAN_DETAIL, &scan_detail, sizeof(scan_detail));
+ Maat_initiate_feather(g_frag_prog_para.feather);
+ if(NULL==g_frag_prog_para.feather)
+ {
+ printf("Maat_summon_feather_json error.\n");
+ return -1;
+ }
+ g_frag_prog_para.expr_tableid = Maat_table_register(g_frag_prog_para.feather,"FRAG_MONITOR_KEYWORDS");
+ g_frag_prog_para.ip_tableid = Maat_table_register(g_frag_prog_para.feather,"FRAG_MONITOR_IP");
+
+ //��ƬԤ��
+ g_frag_prog_para.frag_serverip_tableid = Maat_table_register(g_AV_global_feather,"FRAG_SERVER_IP");
+
+ //NETWORK
+ //get locat IP
+ wired_cfg_read(g_frag_prog_para.avconf_handle,"Module","SendBack_Device",config_buff,sizeof(config_buff),"mg0");
+ g_frag_prog_para.local_ip_nr = get_ip_by_ifname(config_buff);
+ if(g_frag_prog_para.local_ip_nr==INADDR_NONE)
+ {
+ printf("get [NETWORK]LocalIP error.\n");
+ return -1;
+ }
+
+ // index Kafka��ʼ��//
+ /*
+ wired_cfg_read(g_frag_prog_para.avcomconf_handle,"COMMON","FD_TYPE2_KAFKA_BROKERS",g_frag_prog_para.index_brokers, sizeof(g_frag_prog_para.index_brokers),"0");
+ g_frag_prog_para.index_kafka_producer = new KafkaProducer(g_frag_prog_para.index_brokers);
+ if(NULL==g_frag_prog_para.index_kafka_producer)
+ {
+ printf("KafkaProducer error.\n");
+ return -1;
+ }
+ if(0!=g_frag_prog_para.index_kafka_producer->KafkaConnection())
+ {
+ printf("KafkaConnection %s error.\n", g_frag_prog_para.index_brokers);
+ MESA_handle_runtime_log(g_frag_prog_para.logger, RLOG_LV_FATAL, FRGMNT_PLUGIN_NAME,
+ "{%s:%d} KafkaConnection %s error.",
+ __FILE__,__LINE__, g_frag_prog_para.index_brokers);
+ }
+ else
+ {
+ printf("KafkaConnection %s succ.\n", g_frag_prog_para.index_brokers);
+ MESA_handle_runtime_log(g_frag_prog_para.logger, RLOG_LV_FATAL, FRGMNT_PLUGIN_NAME,
+ "{%s:%d} KafkaConnection %s succ.",
+ __FILE__,__LINE__, g_frag_prog_para.index_brokers);
+ }
+ */
+
+ //TRACE
+ wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","TraceConfFile",config_buff, sizeof(config_buff),"./avconf/frag_monitor/addr_trace.conf");
+ g_frag_prog_para.addr_trace = MESA_trace_create_addr_handle(g_frag_prog_para.logger, config_buff);
+ wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","TraceLogFile",g_frag_prog_para.trace_dir, sizeof(g_frag_prog_para.trace_dir),"./avlog/frag_monitor/");
+
+ return 0;
+}
+
+extern "C" int FRAG_MONITOR_INIT()
+{
+ if(-1==read_main_conf())
+ {
+ return -1;
+ }
+
+ /*kafka*/
+ if(NULL!=g_multi_kafka_producer)
+ {
+ /*
+ if((g_multi_kafka_producer->CreateTopicHandle(TOPIC_MEDIA_INDEX_DATA)) == NULL)
+ {
+ printf("Kafka CreateTopicHandle %s failed.", TOPIC_MEDIA_INDEX_DATA);
+ return -1;
+ }
+ if((g_multi_kafka_producer->CreateTopicHandle(TOPIC_MEDIA_CONVERGE_DATA)) == NULL)
+ {
+ printf("Kafka CreateTopicHandle %s failed.", TOPIC_MEDIA_CONVERGE_DATA);
+ return -1;
+ }
+ */
+ rd_kafka_topic_conf_t* frag_topic_conf = rd_kafka_topic_conf_new();
+ g_frag_prog_para.frag_topic_rkt[MEDIA_INDEX_TOPIC_RKT] = rd_kafka_topic_new(g_multi_kafka_producer, TOPIC_MEDIA_INDEX_DATA, frag_topic_conf);
+ frag_topic_conf = rd_kafka_topic_conf_new();
+ g_frag_prog_para.frag_topic_rkt[MEDIA_CNVG_TOPIC_RKT] = rd_kafka_topic_new(g_multi_kafka_producer, TOPIC_MEDIA_CONVERGE_DATA, frag_topic_conf);
+ }
+ g_frag_prog_para.frag_project_id = project_producer_register("FRAG", PROJECT_VAL_TYPE_CHAR, NULL);
+
+ return 0;
+}
+
+extern "C" void FRAG_MONITOR_DESTROY(void)
+{
+
+}
+
+