diff options
| author | lishu <[email protected]> | 2018-11-30 13:04:50 +0800 |
|---|---|---|
| committer | lishu <[email protected]> | 2018-11-30 13:04:50 +0800 |
| commit | c9ea9d4c57745bd3eb6375f802508428f864120e (patch) | |
| tree | 85336b0df0ddf7d166144ad4eaaebf58fe4db197 /src/frag_monitor.c | |
| parent | 69965fc7d12a4e77d758c9e83232f9fbdab6e634 (diff) | |
2018.11.12 1. 1. set K_PROJECT because different maat table
Diffstat (limited to 'src/frag_monitor.c')
| -rw-r--r-- | src/frag_monitor.c | 1560 |
1 files changed, 1560 insertions, 0 deletions
diff --git a/src/frag_monitor.c b/src/frag_monitor.c new file mode 100644 index 0000000..2f27038 --- /dev/null +++ b/src/frag_monitor.c @@ -0,0 +1,1560 @@ +//author: lishu +//date: 2015-9-23 + + +#include <sys/ioctl.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <math.h> +#include <net/if.h> +#include <strings.h> +#include <stdio.h> +#include <stdio.h> +#include <assert.h> +#include <ctype.h> + +#include "MESA_handle_logger.h" +#include "MESA_htable.h" +#include "MESA_prof_load.h" +#include "MESA_trace.h" +#include "Maat_rule.h" +#include "wired_cfg.h" +#include "field_stat2.h" +#include "http.h" +#include "AV_feedback.h" +#include "AV_charater.h" +#include "AV_prog_table.h" +#include "AV_log.h" +#include "AV_kill_connection.h" +#include "cJSON.h" +#include "KafkaProducer.h" + +#include "appdsender.h" +#include "opt.h" +#include "opt_in.h" +#include "optregister.h" +#include "frag_monitor.h" +#include "frag_common.h" +#include "frag_block.h" + +int FRAG_MONITOR_VERSION_2_0_20181112 = 0; +void frag_monitor_history() +{ + //2015.09.23 create the project + //2016.05.04 pid add tcp seq + //2016.05.09 add send switch, add static + //2016.05.11 add mediainfo layer_addr + //2016.05.13 proc_frgmnt_hit log + //2016.08.16 content-type sendback + //2016.08.26 ������������߳�����ʶ�� + //2016.11.02 302�����ݲ�������Ƶ���������� + //2017.01.03 mesa_tcp lock, ʹ��field_stateͳ�� + //2017.02.21 support app + //2017.02.24 add switch to close log + //2017.03.02 cache index info and sendto kafka + //2017.03.22 JSON: ISN ADDR_LIST PROTOCOL PROXY -1 to 48 bit + //2017.04.07 1. log thread safe 2. media_type not protocol + //2017.04.11 1. add topic stat + //2017.04.12 1. tuple7 + //2017.04.17 1. cJSON_print free + //2017.04.25 1. HLS OSMF protocol as HTTP + //2017.05.03 1. index gzip static + //2017.05.15 1. OSMF feature change to f4f 2. hls tpl + //2017.05.24 1. frag_fd + //2017.07.13 1. add similar scan + //2017.09.06 1. delete c2s_content 2. time + //2017.09.11 1. reconstruction + //2017.09.23 1. test + //2017.09.25 1. MEDIA_CNVG_TOPIC 2. delete .inf 3.hls_data_s2c + //2017.09.29 1. change index->data when double dir 2. frag_monitor.conf 3. frag_report_switch + //2017.11.14 1. add frag_flag in http session + //2017.11.30 1. add ts_data feature + //2017.12.08 1. modify the method that tuple7 generate pid + //2018.01.12 1. field_stat monitor 2. add url_key opt when send meta + //2018.01.29 1. use kafka_handle defined by av_master + //2018.03.15 1. support other type + //2018.03.16 1. add frag notice + //2018.05.09 1. add frag_json_report, addr+scan server_ip + //2018.07.10 1. 1. refer send to kafka 2. close frag forecast + //2018.07.24 1. 1. create maat feather + //2018.11.12 1. 1. set K_PROJECT because different maat table +} + +frag_monitor_runtime_parameter_t g_frag_prog_para; +/*sapp threadnum*/ +extern int g_iThreadNum; +/*av_master*/ +//extern KafkaProducer* g_multi_kafka_producer; +extern rd_kafka_t* g_multi_kafka_producer; +/*av master*/ +extern Maat_feather_t g_AV_global_feather; + +const char g_app_feature[4] = {0X50, 0X4B, 0X03, 0X04}; +const char g_osmf_feature[4] = {0X6D, 0x64, 0x61, 0x74}; + +const char* g_log_des[FRAG_LOG_NUM] = +{ + "index", + "rssb_meta", + "rssb_zero" +}; + +const char* hash_eliminate_type[3] = +{ + "", + "ELIMINATE_TYPE_NUM", + "ELIMINATE_TYPE_TIME" +}; + +void frag_write_to_log(int type, frag_info_t* frag, http_infor* a_http,struct streaminfo *a_tcp, int thread_seq) +{ + if(!g_frag_prog_para.log_switch) return; + char buf[2048] = {0}; + int buflen = 0; + time_t cur_time; + struct tm now; + char now_time[32] = {0}; + char day_time[32] = {0}; + char filename[MAX_PATH_LEN] = {0}; + + time(&cur_time); + localtime_r(&cur_time, &now); + strftime(now_time, sizeof(now_time), "%Y-%m-%d %H:%M:%S", &now); + + /*init*/ + if(NULL==g_frag_prog_para.fraginfo_file[thread_seq]) + { + strftime(day_time, sizeof(day_time), "%Y-%m-%d", &now); + snprintf(filename, sizeof(filename), "%s/%s_%d.%s", g_frag_prog_para.log_path, "fraginfo" ,thread_seq,day_time); + g_frag_prog_para.fraginfo_file[thread_seq] = fopen(filename, "a+"); + localtime_r(&cur_time, &g_frag_prog_para.fraginfo_filetime[thread_seq]); + } + + if(now.tm_mday!=g_frag_prog_para.fraginfo_filetime[thread_seq].tm_mday || now.tm_mon!=g_frag_prog_para.fraginfo_filetime[thread_seq].tm_mon || now.tm_year!=g_frag_prog_para.fraginfo_filetime[thread_seq].tm_year) + { + if(g_frag_prog_para.fraginfo_file[thread_seq]) + { + strftime(day_time, sizeof(day_time), "%Y-%m-%d", &now); + snprintf(filename, sizeof(filename), "%s/%s_%d.%s", g_frag_prog_para.log_path, "fraginfo" ,thread_seq,day_time); + fclose(g_frag_prog_para.fraginfo_file[thread_seq]); + g_frag_prog_para.fraginfo_file[thread_seq] = fopen(filename, "a+"); + } + localtime_r(&cur_time, &g_frag_prog_para.fraginfo_filetime[thread_seq]); + } + + if(NULL!=frag->frag_opt[FRAG_URL].opt_value) + { + if(NULL!=frag->frag_opt[FRAG_S2C_CONTENT_TYPE].opt_value) + { + buflen = snprintf(buf, sizeof(buf), "%20s%10s dir:%1d PID:%20llu configID:%5d media_type:%5hhu service_id:%5u session_type:%5u addr_list:%20s url:%s s2c_content_type:%s\n", + now_time, g_log_des[type], a_tcp->dir, frag->tuple7_id, frag->config_id, frag->media_type, frag->service_id, frag->session_type, frag->tuple7, frag->frag_opt[FRAG_URL].opt_value,frag->frag_opt[FRAG_S2C_CONTENT_TYPE].opt_value); + } + else + { + buflen = snprintf(buf, sizeof(buf), "%20s%10s dir:%1d PID:%20llu configID:%5d media_type:%5hhu service_id:%5u session_type:%5u addr_list:%20s url:%s\n", + now_time, g_log_des[type], a_tcp->dir, frag->tuple7_id, frag->config_id, frag->media_type, frag->service_id, frag->session_type, frag->tuple7, frag->frag_opt[FRAG_URL].opt_value); + } + } + else + { + if(NULL!=frag->frag_opt[FRAG_S2C_CONTENT_TYPE].opt_value) + { + buflen = snprintf(buf, sizeof(buf), "%20s%10s dir:%1d PID:%20llu configID:%5d media_type:%5hhu service_id:%5u session_type:%5u addr_list:%20s s2c_content_type:%s\n", + now_time, g_log_des[type], a_tcp->dir, frag->tuple7_id, frag->config_id, frag->media_type, frag->service_id, frag->session_type, frag->tuple7, frag->frag_opt[FRAG_S2C_CONTENT_TYPE].opt_value); + } + else + { + buflen = snprintf(buf, sizeof(buf), "%20s%10s dir:%1d PID:%20llu configID:%5d media_type:%5hhu service_id:%5u session_type:%5u addr_list:%20s\n", + now_time, g_log_des[type], a_tcp->dir, frag->tuple7_id, frag->config_id, frag->media_type, frag->service_id, frag->session_type, frag->tuple7); + } + } + + if(NULL!=g_frag_prog_para.fraginfo_file[thread_seq]) + { + fwrite(buf, strlen(buf),1, g_frag_prog_para.fraginfo_file[thread_seq]); + fflush(g_frag_prog_para.fraginfo_file[thread_seq]); + } +} + +void index_local_log(const char* filename, char* buf, uint32 buflen) +{ + FILE *pFile; + if ((pFile = fopen(filename, "a"))!=NULL) + { + fwrite(buf, buflen, 1, pFile); + fclose(pFile); + } +} + +void frag_json_report(int local_log_type, frag_info_t* frag, http_infor* a_http,struct streaminfo *a_tcp, int thread_seq) +{ + AV_info_t p; + opt_unit_t* opt =NULL; + int opt_num = 0; + + p.a_stream = a_tcp; + p.pid = frag->tuple7_id; + if(frag->frag_opt[FRAG_URL].opt_value!=NULL) + { + p.url = frag->frag_opt[FRAG_URL].opt_value; + } + else + { + p.url = NULL; + } + p.length = a_http->cont_length; + p.protocol = frag->proto; + + /*referer*/ + if(frag->frag_opt[FRAG_REFERER].opt_value!=NULL) + { + opt = (opt_unit_t*)dictator_malloc(thread_seq, sizeof(opt_unit_t)); + opt->opt_len = frag->frag_opt[FRAG_REFERER].opt_len; + opt->opt_type = META_OPT_REFERER; + opt->opt_value = frag->frag_opt[FRAG_REFERER].opt_value; + opt_num ++; + } + AV_local_log_convert_json(&p,local_log_type,opt,opt_num); + if(opt!=NULL) + { + dictator_free(thread_seq, opt); + } +} + +//void frgmnt_index_sendback_more(frag_info_t* frag, http_infor* a_http, struct streaminfo *a_tcp, int thread_seq) +void frgmnt_index_sendback(frag_info_t* frag, http_infor* a_http, struct streaminfo *a_tcp, int thread_seq) +{ + cJSON* root; + char* out = NULL; + uint32_t out_len = 0; + char ipbuf[32] = {0}; + int ipbuf_len = 32; + string topic_name; + int cb_ret = 0; + char pid_buf[64] = {0}; + + root = cJSON_CreateObject(); + + /*topic*/ + if(0==frag->data_dir) + { + cJSON_AddStringToObject(root, "topic", TOPIC_MEDIA_INDEX_DATA); + topic_name = TOPIC_MEDIA_INDEX_DATA; + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[INDEX_TOPIC], 0, FS_OP_ADD, 1); + } + else + { + cJSON_AddStringToObject(root, "topic",TOPIC_MEDIA_CONVERGE_DATA); + topic_name = TOPIC_MEDIA_CONVERGE_DATA; + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[CNVG_TOPIC], 0, FS_OP_ADD, 1); + } + + /*service_id*/ + cJSON_AddNumberToObject(root, "service_id", frag->service_id); + /*capIP*/ + inet_ntop(AF_INET, &g_frag_prog_para.local_ip_nr, ipbuf, ipbuf_len); + cJSON_AddStringToObject(root, "cap_ip", ipbuf); + /*tuple7=pid !!!!PID�����ּ���JSON�����*/ + snprintf(pid_buf, sizeof(pid_buf), "%llu", frag->tuple7_id); + cJSON_AddStringToObject(root, "pid", pid_buf); + /*dir*/ + cJSON_AddNumberToObject(root, "dir", a_tcp->dir); + /*curdir*/ + cJSON_AddNumberToObject(root, "curdir", a_http->curdir); + /*url*/ + if(NULL!=frag->frag_opt[FRAG_URL].opt_value) + { + cJSON_AddStringToObject(root, "url", frag->frag_opt[FRAG_URL].opt_value); + } + /*ua*/ + if(NULL!=frag->frag_opt[FRAG_UA].opt_value) + { + cJSON_AddStringToObject(root, "user_agent", frag->frag_opt[FRAG_UA].opt_value); + } + /*referer*/ + if(NULL!=frag->frag_opt[FRAG_REFERER].opt_value) + { + cJSON_AddStringToObject(root, "referer", frag->frag_opt[FRAG_REFERER].opt_value); + } + /*s2c_content*/ + if(NULL!=frag->frag_opt[FRAG_S2C_CONTENT].opt_value) + { + frag->frag_opt[FRAG_S2C_CONTENT].opt_value = (char*)dictator_realloc(thread_seq, frag->frag_opt[FRAG_S2C_CONTENT].opt_value, frag->frag_opt[FRAG_S2C_CONTENT].opt_len+1); + frag->frag_opt[FRAG_S2C_CONTENT].opt_value[frag->frag_opt[FRAG_S2C_CONTENT].opt_len] = '\0'; + frag->frag_opt[FRAG_S2C_CONTENT].opt_len += 1; + cJSON_AddStringToObject(root, "s2c_content", frag->frag_opt[FRAG_S2C_CONTENT].opt_value); + } + out = cJSON_Print(root); + out_len = strlen(out); + if(MESA_trace_match_addr(g_frag_prog_para.addr_trace, (trace_layer_addr*)(&a_tcp->addr))) + { + const char* addr = NULL; + char filename[MAX_PATH_LEN] = {0}; + addr = printaddr(&a_tcp->addr, thread_seq); + snprintf(filename,sizeof(filename),"%s/%s,%u", g_frag_prog_para.trace_dir,addr, a_http->http_session_seq); + index_local_log(filename, out, out_len); + } + /*kfaka*/ + if(NULL!=g_multi_kafka_producer) + { + //cb_ret = g_multi_kafka_producer->SendData(topic_name, (void *)out, (size_t)out_len); + cb_ret = rd_kafka_produce(g_frag_prog_para.frag_topic_rkt[MEDIA_INDEX_TOPIC_RKT], RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, out, out_len, NULL, 0, NULL); + + } + if(out) + { + free(out); + } + cJSON_Delete(root); +} + +/* +void frgmnt_index_sendback(frag_info_t* frag, http_infor* a_http, struct streaminfo *a_tcp, int thread_seq) +{ + for(int i=0;i<10000;i++) + { + frgmnt_index_sendback_more(frag,a_http,a_tcp,thread_seq); + } +} +*/ +void create_media_opt(media_opt_info_t** media_optinfo, uchar opt_type, char* opt_value, uint32 opt_len, int thread_seq) +{ + media_opt_info_t* node = *media_optinfo; + if(node==NULL) + { + node = (media_opt_info_t*)dictator_malloc(thread_seq, sizeof(media_opt_info_t)); + node->opt = NULL; + node->opt_content_id = -1; + node->opt_num = 0; + } + int opt_num = node->opt_num; + node->opt = (opt_unit_t*)dictator_realloc(thread_seq, (void*)node->opt, sizeof(struct opt_unit_t)*(1+opt_num)); + memset((void*)&node->opt[opt_num],0,sizeof(struct opt_unit_t)); + node->opt[opt_num].opt_value = (char*)dictator_malloc(thread_seq, opt_len); + memcpy(node->opt[opt_num].opt_value,opt_value,opt_len); + node->opt[opt_num].opt_type = opt_type; + node->opt[opt_num].opt_len = opt_len; + node->opt_num += 1; + (*media_optinfo) = node; +} + +void destroy_media_opt(media_opt_info_t** media_optinfo, int thread_seq) +{ + if((*media_optinfo)->opt!=NULL) + { + for(int i=0;i<(*media_optinfo)->opt_num;i++) + { + if((*media_optinfo)->opt[i].opt_value!=NULL) + { + dictator_free(thread_seq, (*media_optinfo)->opt[i].opt_value); + (*media_optinfo)->opt[i].opt_value = NULL; + } + } + dictator_free(thread_seq, (*media_optinfo)->opt); + (*media_optinfo)->opt = NULL; + } + dictator_free(thread_seq, *media_optinfo); + *media_optinfo = NULL; +} + +void append_media_opt(media_opt_info_t* media_optinfo, uchar opt_id, char* opt_value, uint32 opt_len, int thread_seq) +{ + (media_optinfo)->opt[opt_id].opt_value = (char*)dictator_realloc(thread_seq, (media_optinfo)->opt[opt_id].opt_value, + (media_optinfo)->opt[opt_id].opt_len+opt_len); + memcpy((media_optinfo)->opt[opt_id].opt_value+(media_optinfo)->opt[opt_id].opt_len,opt_value,opt_len); + (media_optinfo)->opt[opt_id].opt_len += opt_len; +} + +int ts_data_check(char* data, uint32 data_len) +{ + return 0; +} + +uchar frgmnt_av_sendback(frag_info_t* frag, char* data, uint32 data_len, http_infor* a_http, struct streaminfo *a_tcp, int thread_seq) +{ + char DST = AV_DST_NORMAL; + char query_url_key[1024] = {0}; + int gen_key = 0; + + char serviceid[256] = {0}; + snprintf(serviceid,sizeof(serviceid),"%d",frag->service_id); + + /*�ر�ģ�HLS .ts�����ݻỰ����һ���ֽ�Ϊ0x47*/ + /*����Ҫ�ж�range����Ƭ�������ٷ��߳�*/ + /*����Ҫ����lostlen�� lost֮�������룬ֱ�Ӳ��ش�*/ + if(frag->service_id==SERVICE_HLS_DATA && frag->cont_offset==0) + { + if(*data!=0x47) + { + return PROT_STATE_DROPME; + } + } + + /*ȷ��frag*/ + project_req_add_char(a_tcp, g_frag_prog_para.frag_project_id, 1); + + /*select DST*/ + switch(frag->media_type) + { + case FILE_IOS: + case FILE_ANDRIOD: + case FILE_APP: + DST = AV_DST_APP; + break; + + default: + DST = AV_DST_FRAGMENT_QUERY; + break; + } + + /*media_info*/ + if(!(frag->send_meta_flag)) + { + frag->av_info = (av_info_t*)dictator_malloc(thread_seq, sizeof(av_info_t)); + memset(frag->av_info,0,sizeof(av_info_t)); + + /*PID*/ + frag->av_info->media_info.pid = frag->tuple7_id; + /*protocol*/ + frag->av_info->media_info.protocol = frag->proto; + /*media_type*/ + frag->av_info->media_info.media_type = frag->media_type; + /*media_len*/ + if(a_http->cont_range!=NULL) + { + frag->av_info->media_info.prog_len = a_http->cont_range->len; + frag->cont_offset = a_http->cont_range->start; + } + else if(a_http->cont_length!=0) + { + frag->av_info->media_info.prog_len = a_http->cont_length; + } + else + { + frag->av_info->media_info.prog_len = MAX_UINT64_VAL; + } + + /*serviceID*/ + if(0!=frag->service_id) + { + create_media_opt(&(frag->av_info->media_optinfo),META_OPT_SERVICE_ID, serviceid, strlen(serviceid),thread_seq); + } + /*addr*/ + create_media_opt(&(frag->av_info->media_optinfo),META_OPT_LAYER_ADDR, (char*)(&a_tcp->addr), sizeof(a_tcp->addr), thread_seq); + /*url*/ + if(NULL!=frag->frag_opt[FRAG_URL].opt_value) + { + create_media_opt(&(frag->av_info->media_optinfo),META_OPT_TYPE_URL, frag->frag_opt[FRAG_URL].opt_value, frag->frag_opt[FRAG_URL].opt_len-1,thread_seq); + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[URL_META], 0, FS_OP_ADD, 1); + } + /*��������key*/ + else + { + gen_key = append_http_query_key(a_tcp, a_http->http_session_seq, query_url_key, sizeof(query_url_key)); + if(gen_key >= 0) + { + create_media_opt(&(frag->av_info->media_optinfo), META_OPT_SINGLE_KEY, query_url_key, strlen(query_url_key),thread_seq); + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[KEY_META], 0, FS_OP_ADD, 1); + } + } + /*ua*/ + if(NULL!=frag->frag_opt[FRAG_UA].opt_value) + { + create_media_opt(&(frag->av_info->media_optinfo),META_OPT_USER_AGENT, frag->frag_opt[FRAG_UA].opt_value, frag->frag_opt[FRAG_UA].opt_len-1,thread_seq); + } + /*referer*/ + if(NULL!=frag->frag_opt[FRAG_REFERER].opt_value) + { + create_media_opt(&(frag->av_info->media_optinfo),META_OPT_REFERER, frag->frag_opt[FRAG_REFERER].opt_value, frag->frag_opt[FRAG_REFERER].opt_len-1,thread_seq); + } + if(NULL!=frag->av_info->media_optinfo) + { + AV_send_media_info_to(DST, &(frag->av_info->media_info),frag->av_info->media_optinfo->opt,frag->av_info->media_optinfo->opt_num,thread_seq); + } + else + { + AV_send_media_info_to(DST, &(frag->av_info->media_info),NULL,0,thread_seq); + } + frag_write_to_log(FRAG_META_LOG, frag, a_http, a_tcp, thread_seq); + frag->send_meta_flag = 1; + + /*�����ݲŷ�Ԫ��Ϣ*/ + frag_write_to_log(FRAG_ZERO_LOG, frag, a_http, a_tcp,thread_seq); + } + AV_send_data_to(DST, frag->av_info->media_info.pid, + frag->cont_offset, + (const char*)data, + data_len, + frag->av_info->media_info.protocol, + thread_seq, (const struct layer_addr*)(&a_tcp->addr)); + frag->cont_offset += data_len; + /*stat*/ + switch(frag->media_type) + { + case FILE_OSMF: + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[OSMF_DATA_PKTS], 0, FS_OP_ADD, 1); + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[OSMF_DATA_BYTES], 0, FS_OP_ADD, data_len); + break; + case FILE_HLS: + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[HLS_DATA_PKTS], 0, FS_OP_ADD, 1); + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[HLS_DATA_BYTES], 0, FS_OP_ADD, data_len); + break; + case FILE_IOS: + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[IOS_DATA_PKTS], 0, FS_OP_ADD, 1); + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[IOS_DATA_BYTES], 0, FS_OP_ADD, data_len); + break; + case FILE_ANDRIOD: + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[ANDRIOD_DATA_PKTS], 0, FS_OP_ADD, 1); + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[ANDRIOD_DATA_BYTES], 0, FS_OP_ADD, data_len); + break; + case FILE_APP: + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[APP_DATA_PKTS], 0, FS_OP_ADD, 1); + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[APP_DATA_BYTES], 0, FS_OP_ADD, data_len); + break; + case FILE_FRAG: + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[FRAG_DATA_PKTS], 0, FS_OP_ADD, 1); + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[FRAG_DATA_BYTES], 0, FS_OP_ADD, data_len); + break; + case FILE_MAYBE_FRAG: + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[MAYBE_FRAG_DATA_PKTS], 0, FS_OP_ADD, 1); + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[MAYBE_FRAG_DATA_BYTES], 0, FS_OP_ADD, data_len); + break; + default: + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[WEIRD_DATA_PKTS], 0, FS_OP_ADD, 1); + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[WEIRD_DATA_BYTES], 0, FS_OP_ADD, data_len); + break; + } + return PROT_STATE_GIVEME; +} + +int init_frag(frag_info_t **frag, int thread_seq) +{ + frag_info_t* pme = (frag_info_t*)dictator_malloc(thread_seq, sizeof(frag_info_t)); + memset(pme, 0 ,sizeof(frag_info_t)); + *frag = pme; + return 0; +} + +int release_frag(frag_info_t *frag, int thread_seq) +{ + if(frag == NULL) + { + return -1; + } + if(frag->mid != NULL) + { + Maat_clean_status(&(frag->mid)); + frag->mid = NULL; + } + for(int i=0;i<FRAG_OPR_NUM;i++) + { + if(NULL!=frag->frag_opt[i].opt_value) + { + dictator_free(thread_seq, frag->frag_opt[i].opt_value); + } + } + if(frag->av_info!= NULL) + { + if(NULL!=frag->av_info->media_optinfo) + { + destroy_media_opt(&(frag->av_info->media_optinfo), thread_seq); + frag->av_info->media_optinfo = NULL; + } + dictator_free(thread_seq,frag->av_info); + } + dictator_free(thread_seq, frag); + return 0; +} + +void free_media(void* data) +{ + if(NULL!=data) + { + free(data); + } +} + +int expire_media_hash_node(void *data, int eliminate_type) +{ + media_format_hnode_t* mdi = (media_format_hnode_t*)data; + + MESA_handle_runtime_log(g_frag_prog_para.logger, RLOG_LV_INFO, FRGMNT_PLUGIN_NAME, + "{%s:%d} expire_media_hash_node %s: related_id: %llu, media_type:0x%02x]", + __FILE__,__LINE__, + hash_eliminate_type[eliminate_type], + mdi->relate_id, mdi->media_type); + return 1; +} + +uint64 make_frag_pid(frag_info_t* frag, http_infor *a_http, struct streaminfo *a_tcp, int thread_seq) +{ + uint32 tcp_seq = 0; + uint32 tcp_seq_len = sizeof(uint32); + uchar proxy_type = 0; + int rec = 0; + + /*create tuple7_id*/ + if((a_tcp->pfather != NULL) && (a_tcp->pfather->type == STREAM_TYPE_HTTP_PROXY)) + { + proxy_type = a_tcp->pfather->type; + } + rec = MESA_get_stream_opt(a_tcp, MSO_TCP_ISN_C2S, (void*)&tcp_seq, (int*)&tcp_seq_len); + if(rec < 0) + { + tcp_seq = 0; + } + snprintf(frag->tuple7,sizeof(frag->tuple7),"%s,%u,%u,%u",printaddr(&a_tcp->addr,thread_seq),tcp_seq,proxy_type,a_http->http_session_seq); + return AV_make_porg_id((unsigned char*)frag->tuple7, strlen(frag->tuple7), PID_TYPE_RESORT); +} + +/*compileֵԽ�����ȼ���Խ��*/ +int select_maat_result(struct Maat_rule_t maat_result[],int result_num) +{ + int best_idx = 0; + int max_config = maat_result[0].config_id; + for(int i=1;i<result_num;i++) + { + if(max_config<maat_result[i].config_id) + { + best_idx = i; + max_config = maat_result[i].config_id; + } + } + return best_idx; +} + +int is_frag_server_ip(struct streaminfo *a_tcp) +{ + int ret = -1; + Maat_rule_t result; + scan_status_t mid = NULL; + + ret = Maat_scan_proto_addr(g_AV_global_feather, + g_frag_prog_para.frag_serverip_tableid, + (struct ipaddr*)(&a_tcp->addr), + 6, + &result, + 1, + &mid, + a_tcp->threadnum); + + Maat_clean_status(&mid); + if(ret > 0) + { + return 1; + } + return 0; +} + +uchar set_frgmnt_hit(struct Maat_rule_t maat_result[], int result_num,frag_info_t* frag, http_infor *a_http, struct streaminfo *a_tcp, int thread_seq) +{ + scan_status_t mid = NULL; + + int idx = select_maat_result(maat_result, result_num); + if(NULL!=maat_result) + { + frag->config_id = maat_result[idx].config_id; + frag->service_id = maat_result[idx].service_id; + frag->proto = AV_PROTOCOL_HTTP; + frag->media_type = maat_result[idx].do_blacklist; + /*���ݷ���index ����rssb*/ + frag->session_type = maat_result[idx].action; + /*0:������Ϣ 1/2:������Ϣ*/ + frag->data_dir = maat_result[idx].do_log; + } + + /*C2S��������IP����FRAG_SERVER_IP����Ҫ����ƬԤ�棬�����ĺ����ݵĶ�Ҫ��*/ + /*S2C��������IP����FRAG_SERVER_IP, ��Ϊ����Ƭ*/ + if(g_frag_prog_para.frag_forecast_switch && 0!=frag->data_dir) + { + if(is_frag_server_ip(a_tcp)) + { + frag->hit_server_ip = 1; + } + if(frag->media_type==FILE_MAYBE_FRAG) + { + /*FILE_FRAG_FORECAST�ǽ�������������,��S2CӦ���ʶ��ģ����ܴ���URL*/ + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[HIT_SESSION], 0, FS_OP_ADD, 1); + if(a_http->p_url!=NULL) + { + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[URLDROP_SESSION], 0, FS_OP_ADD, 1); + return PROT_STATE_DROPME; + } + if(frag->hit_server_ip==0) + { + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[DROP_SESSION], 0, FS_OP_ADD, 1); + return PROT_STATE_DROPME; + } + } + } + + frag->proc_stat = FRAG_PROC_STAT_HIT; + frag->tuple7_id = make_frag_pid(frag, a_http, a_tcp, thread_seq); + + switch(frag->media_type) + { + case FILE_OSMF: + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[OSMF_SESSION], 0, FS_OP_ADD, 1); + break; + case FILE_HLS: + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[HLS_SESSION], 0, FS_OP_ADD, 1); + break; + case FILE_IOS: + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[IOS_SESSION], 0, FS_OP_ADD, 1); + break; + case FILE_ANDRIOD: + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[ANDRIOD_SESSION], 0, FS_OP_ADD, 1); + break; + case FILE_APP: + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[APP_SESSION], 0, FS_OP_ADD, 1); + break; + case FILE_FRAG: + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[FRAG_SESSION], 0, FS_OP_ADD, 1); + break; + case FILE_MAYBE_FRAG: + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[MAYBE_FRAG_SESSION], 0, FS_OP_ADD, 1); + break; + default: + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[WEIRD_SESSION], 0, FS_OP_ADD, 1); + break; + } + if(frag->session_type==SESSION_FRGMNT_INDEX) + { + switch(frag->media_type) + { + case FILE_HLS: + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[HLS_INDEX_SESSION], 0, FS_OP_ADD, 1); + if(a_http->cont_encoding==HTTP_CONT_ENCOD_GZIP) + { + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[INDEX_GZIP], 0, FS_OP_ADD, 1); + } + break; + default: + break; + } + } + else + { + switch(frag->media_type) + { + case FILE_OSMF: + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[OSMF_DATA_SESSION], 0, FS_OP_ADD, 1); + break; + case FILE_HLS: + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[HLS_DATA_SESSION], 0, FS_OP_ADD, 1); + break; + case FILE_IOS: + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[IOS_DATA_SESSION], 0, FS_OP_ADD, 1); + break; + case FILE_ANDRIOD: + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[ANDRIOD_DATA_SESSION], 0, FS_OP_ADD, 1); + break; + case FILE_APP: + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[APP_DATA_SESSION], 0, FS_OP_ADD, 1); + break; + case FILE_FRAG: + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[FRAG_DATA_SESSION], 0, FS_OP_ADD, 1); + break; + case FILE_MAYBE_FRAG: + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[MAYBE_FRAG_DATA_SESSION], 0, FS_OP_ADD, 1); + break; + default: + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[WEIRD_DATA_SESSION], 0, FS_OP_ADD, 1); + break; + } + } +} + +int set_osmf_hit(frag_info_t *frag, http_infor *a_http, struct streaminfo *a_tcp, int thread_seq) +{ + frag->config_id = 0; + frag->service_id = SERVICE_OSMF_DATA; + frag->proto = AV_PROTOCOL_HTTP; + frag->media_type = FILE_OSMF; + frag->session_type = SESSION_FRGMNT_DATA; + frag->data_dir = FRAG_TYPE_DATA_S2C; + + frag->tuple7_id = make_frag_pid(frag, a_http, a_tcp, thread_seq); + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[OSMF_SESSION], 0, FS_OP_ADD, 1); + return 1; +} + +/* +int set_hls_hit(frag_info_t *frag, http_infor *a_http, struct streaminfo *a_tcp, int thread_seq) +{ + frag->config_id = 0; + frag->service_id = SERVICE_HLS_DATA; + frag->proto = AV_PROTOCOL_HTTP; + frag->media_type = FILE_HLS; + frag->session_type = SESSION_FRGMNT_DATA; + frag->data_dir = FRAG_TYPE_DATA_S2C; + + frag->tuple7_id = make_frag_pid(frag, a_http, a_tcp, thread_seq); + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[OSMF_SESSION], 0, FS_OP_ADD, 1); + return 1; +} +*/ + +int set_app_hit(frag_info_t *frag, http_infor *a_http, struct streaminfo *a_tcp, int thread_seq) +{ + frag->config_id = 0; + frag->service_id = SERVICE_APP; + frag->proto = AV_PROTOCOL_HTTP; + frag->media_type = FILE_APP; + frag->session_type = SESSION_FRGMNT_UNKNOW; + frag->data_dir = FRAG_TYPE_DATA_S2C; + + frag->tuple7_id = make_frag_pid(frag, a_http, a_tcp, thread_seq); + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[APP_SESSION], 0, FS_OP_ADD, 1); + return 1; +} + +static int url_indicated_app(const char *url) +{ + const char *pfilename = NULL, *p1 = NULL, *p2 = NULL, *p3 = NULL, *p = NULL; + unsigned int len=0; + + if(url == NULL) + return 0; + + pfilename = strrchr(url, '/'); + if(pfilename == NULL || *(pfilename + 1) == '\0') + return 0; + + pfilename = pfilename + 1; + len = strlen(pfilename); + + p1 = strchr(pfilename, ';'); + p2 = strchr(pfilename, '?'); + p3 = strchr(pfilename, '#'); + + if(p1>p2) + { + p = p1; p1 = p2; p2 = p; + } + if(p1>p3) + { + p = p3; p1 = p3; p3 = p; + } + if(p2>p3) + { + p = p2; p2 = p3; p3 = p; + } + if(p1 != NULL) + len = p1 - pfilename; + else if(p2 != NULL) + len = p2 - pfilename; + else if(p3 != NULL) + len = p3 - pfilename; + + if(!strncasecmp(pfilename + len - 4, ".apk", 4)) + return FILE_ANDRIOD; + if(!strncasecmp(pfilename + len - 4, ".ipa", 4)) + return FILE_IOS; + return 0; +} + +void create_related_id(frag_info_t *frag, http_infor* a_http, struct streaminfo *a_tcp) +{ + if(frag->related_id!=0) return; + uint64 cont_len = a_http->cont_length; + + /*��¼�ļ����ȣ���������related_id, ֻ��������S2C������������content_range��lenΪ�ļ��ܳ���*/ + if(a_http->cont_range!=NULL) + { + cont_len = a_http->cont_range->len; + } + + if(cont_len==0) return; + + struct layer_addr* addr = &a_tcp->addr; + uchar pid_buf[PID_BUF_LEN] = {0}; + if(addr->addrtype == ADDR_TYPE_IPV4) + { + struct tuple4* tuple4 = (struct tuple4 *)(addr->paddr); + char ip4saddr[IPV4_ADDR_LEN] = {0}; + char ip4daddr[IPV4_ADDR_LEN] = {0}; + inet_ntop(AF_INET, &tuple4->saddr, ip4saddr, IPV4_ADDR_LEN); + inet_ntop(AF_INET, &tuple4->daddr, ip4daddr, IPV4_ADDR_LEN); + snprintf((char*)pid_buf, PID_BUF_LEN, "IPV4%s,%s,%llu", ip4saddr, ip4daddr, cont_len); + } + else if(addr->addrtype == ADDR_TYPE_IPV6) + { + struct tuple6* tuple6 = (struct tuple6 *)(addr->paddr); + char ip6saddr[IPV6_ADDR_LEN] = {0}; + char ip6daddr[IPV6_ADDR_LEN] = {0}; + inet_ntop(AF_INET6, &tuple6->saddr, ip6saddr, IPV6_ADDR_LEN); + inet_ntop(AF_INET6, &tuple6->daddr, ip6daddr, IPV6_ADDR_LEN); + snprintf((char*)pid_buf, PID_BUF_LEN, "IPV6%s,%s,%llu", ip6saddr, ip6daddr, cont_len); + } + else + { + return; + } + frag->related_id = AV_make_porg_id(pid_buf, (unsigned int)strlen((const char*)pid_buf), PID_TYPE_RESORT); +} + +long media_hash_search_cb(void *data, const uint8_t *key, uint size, void *user_arg) +{ + frag_info_t* frag = (frag_info_t*)user_arg; + if(NULL!=data) + { + frag->media_type = ((media_format_hnode_t*)data)->media_type; + return 1; + } + return 0; +} + +int frag_multithread_hash_search(const char *pHeader, int iHeaderLen, frag_info_t* frag, http_infor *a_http, struct streaminfo *a_tcp, int thread_seq) +{ + if(!g_frag_prog_para.app_switch && !g_frag_prog_para.osmf_data_feature_switch) return 0; + + long rec_cb = 0; + char* find_pos = NULL; + media_format_hnode_t* media_format_hnode = NULL; + + create_related_id(frag, a_http, a_tcp); + if(0!=frag->related_id) + { + /*���߳���������£���ѯmedia_hash����ʶ��*/ + MESA_htable_search_cb(g_frag_prog_para.media_hash, + (const uint8_t *)&frag->related_id, + sizeof(frag->related_id), + media_hash_search_cb, + (void*)frag, + &rec_cb); + if(rec_cb==0) + { + /*����������ƥ��*/ + if(g_frag_prog_para.app_switch && g_frag_prog_para.app_gzip_switch) + { + find_pos = (char*)memmem(pHeader, iHeaderLen, g_app_feature, sizeof(g_app_feature)); + if(NULL!=find_pos) + { + frag->media_type = FILE_APP; + } + } + if(g_frag_prog_para.osmf_data_feature_switch) + { + find_pos = (char*)memmem(pHeader, iHeaderLen, g_osmf_feature, sizeof(g_osmf_feature)); + if(NULL!=find_pos) + { + frag->media_type = FILE_OSMF; + } + } + if(NULL!=find_pos) + { + /*add prog_id , ��ͨ���߳��ļ�����*/ + media_format_hnode = (media_format_hnode_t*)malloc(sizeof(media_format_hnode_t)); + media_format_hnode->media_type = frag->media_type; + media_format_hnode->relate_id = frag->related_id; + MESA_htable_add(g_frag_prog_para.media_hash, (const unsigned char*)&frag->related_id, sizeof(frag->related_id), (const void*)media_format_hnode); + /*write log*/ + MESA_handle_runtime_log(g_frag_prog_para.logger,RLOG_LV_INFO,FRGMNT_PLUGIN_NAME, + (char*)"[%s:%d] media_format_indentify MESA_htable_add related_id:%llu, media_type:0x%02x, url:%s." , + __FILE__,__LINE__, frag->related_id, frag->media_type, a_http->p_url); + } + } + + /*����������*/ + if(rec_cb || NULL!=find_pos) + { + switch(frag->media_type) + { + case FILE_OSMF: + set_osmf_hit(frag, a_http, a_tcp, thread_seq); + return 1; + break; + + case FILE_APP: + set_app_hit(frag, a_http, a_tcp, thread_seq); + frag->session_type = SESSION_FRGMNT_DATA; + return 1; + break; + + default: + break; + } + } + } + return 0; +} + +char frag_scan(stSessionInfo* session_info, frag_info_t* frag, http_infor *a_http, struct streaminfo *a_tcp, int thread_seq) +{ + int ret = 0; + uchar return_value = PROT_STATE_GIVEME; + const char* scan_data = NULL; + uint32 scan_datalen = 0; + const char* region_name = {0}; + uint32 region_name_len = 0; + int found_pos[MAAT_RESULT_NUM] = {0}; + int hit_result = 0; + struct Maat_rule_t maat_result[MAAT_RESULT_NUM]; + char url[2048] = {0}; + + memset(&maat_result, 0, sizeof(maat_result)); + /*scan preproc*/ + switch(session_info->prot_flag) + { + case HTTP_CONTENT: + /*����ֻɨ���װ�*/ + if((session_info->prot_flag==HTTP_CONTENT||session_info->prot_flag==HTTP_UNGZIP_CONTENT) + && a_http->curdir==DIR_S2C && frag->s2c_first_pkt==0) + { + frag->s2c_first_pkt = 1; + scan_data = (char*)session_info->buf; + scan_datalen = session_info->buflen; + } + break; + + case HTTP_CONT_LENGTH: + case HTTP_CONT_RANGE: + case HTTP_USER_AGENT: + case HTTP_REFERER: + case HTTP_COOKIE: + case HTTP_HOST: + /*����Ҫɨ��*/ + break; + case HTTP_CONT_TYPE: + if(a_http->curdir==DIR_S2C) + { + scan_data = (char*)session_info->buf; + scan_datalen = session_info->buflen; + } + break; + + default: + if(g_frag_prog_para.app_switch && session_info->prot_flag==HTTP_MESSAGE_URL) + { + memcpy(url, session_info->buf, MIN((int)session_info->buflen, (int)(sizeof(url)-1))); + frag->media_type = url_indicated_app(url); + if(frag->media_type) + { + hit_result = set_app_hit(frag,a_http,a_tcp,thread_seq); + } + } + if(!frag->media_type) + { + scan_data = (char*)session_info->buf; + scan_datalen = session_info->buflen; + } + break; + } + + /*scan*/ + if(NULL!=scan_data) + { + /* ����ʽɨ��*/ + region_name = http_proto_flag2region(session_info->prot_flag); + region_name_len = strlen(region_name); + ret=Maat_set_scan_status(g_frag_prog_para.feather, &frag->mid, MAAT_SET_SCAN_DISTRICT,region_name,strlen(region_name)); + if(ret<0) + { + return PROT_STATE_DROPME; + } + hit_result = Maat_full_scan_string( g_frag_prog_para.feather, g_frag_prog_para.expr_tableid, CHARSET_GBK, + scan_data, scan_datalen, + maat_result, found_pos, MAAT_RESULT_NUM, &(frag->mid),thread_seq); + /*IP��ַɨ��*/ + if(g_frag_prog_para.ip_switch && hit_result<=0 && frag->s2c_first_pkt) + { + /*S2C�װ������У�ɨ��IP��ַ*/ + hit_result = Maat_scan_proto_addr(g_frag_prog_para.feather,g_frag_prog_para.ip_tableid, + (struct ipaddr*)&a_tcp->addr, TCL_PROTOCOL, + maat_result,MAAT_RESULT_NUM,&(frag->mid),thread_seq); + } + + /* ���߳������HASH����*/ + if(hit_result<=0 && frag->s2c_first_pkt) + { + hit_result = frag_multithread_hash_search(scan_data,scan_datalen,frag,a_http,a_tcp,thread_seq); + /*����ɨ�裬DROP�Ự*/ + if(hit_result<=0) + { + //if(scan_data!=NULL && *scan_data==0x47) + //{ + // hit_result = set_hls_hit(frag, a_http, a_tcp, thread_seq); + //} + //else + //{ + return PROT_STATE_DROPME; + //} + } + } + } + + if(hit_result>0) + { + return_value = set_frgmnt_hit(maat_result,hit_result,frag,a_http,a_tcp,thread_seq); + } + return return_value; +} + +char frag_save(stSessionInfo* session_info, frag_info_t* frag, http_infor *a_http, struct streaminfo *a_tcp, int thread_seq) +{ + int opt_type = -1; + + switch(session_info->prot_flag) + { + case HTTP_MESSAGE_URL: + opt_type = FRAG_URL; + break; + case HTTP_HOST: + opt_type = FRAG_HOST; + break; + case HTTP_USER_AGENT: + opt_type = FRAG_UA; + break; + case HTTP_REFERER: + opt_type = FRAG_REFERER; + break; + case HTTP_CONT_TYPE: + if(a_http->curdir==DIR_S2C) + { + opt_type = FRAG_S2C_CONTENT_TYPE; + } + break; + default: + break; + } + if(-1!=opt_type && frag->frag_opt[opt_type].opt_value==NULL) + { + frag->frag_opt[opt_type].opt_value = (char*)dictator_malloc(thread_seq, session_info->buflen+1); + memcpy(frag->frag_opt[opt_type].opt_value, session_info->buf, session_info->buflen); + frag->frag_opt[opt_type].opt_value[session_info->buflen] = '\0'; + frag->frag_opt[opt_type].opt_len = session_info->buflen+1; + } + + return 0; +} + +uchar frag_sendback(stSessionInfo* session_info, frag_info_t* frag, http_infor *a_http, struct streaminfo *a_tcp, int thread_seq) +{ + uchar return_stat = PROT_STATE_GIVEME; + + /*����������Ϣ����ȷ��Ŀ�ĵأ�ͨ��a_tcp->dir�ж��Ƿ��ǵ������پ���Ŀ�ĵ�*/ + if(frag->session_type==SESSION_FRGMNT_UNKNOW) + { + if(a_tcp->dir == DIR_DOUBLE) + { + frag->session_type = SESSION_FRGMNT_DATA; + } + else + { + frag->session_type = SESSION_FRGMNT_INDEX; + } + } + + /*change because of a_tcp->dir is not accurate*/ + if(frag->session_type == SESSION_FRGMNT_INDEX) + { + if(a_tcp->dir==DIR_DOUBLE && frag->data_dir!=0) + { + frag->session_type = SESSION_FRGMNT_DATA; + } + } + + + /*index need cache;*/ + if(frag->session_type == SESSION_FRGMNT_INDEX) + { + if(DIR_S2C==a_http->curdir && (session_info->prot_flag==HTTP_CONTENT || session_info->prot_flag==HTTP_UNGZIP_CONTENT)) + { + if(frag->frag_opt[FRAG_S2C_CONTENT].opt_len<g_frag_prog_para.s2c_cache_size) + { + frag->frag_opt[FRAG_S2C_CONTENT].opt_value = (char*)dictator_realloc(thread_seq, frag->frag_opt[FRAG_S2C_CONTENT].opt_value, frag->frag_opt[FRAG_S2C_CONTENT].opt_len+session_info->buflen); + memcpy(frag->frag_opt[FRAG_S2C_CONTENT].opt_value+frag->frag_opt[FRAG_S2C_CONTENT].opt_len, session_info->buf, session_info->buflen); + frag->frag_opt[FRAG_S2C_CONTENT].opt_len += session_info->buflen; + } + else + { + frag->proc_stat = FRAG_PROC_STAT_SCAN; + return PROT_STATE_DROPME; + } + } + } + + /*send ȷ������CPZ*/ + if(frag->session_type==SESSION_FRGMNT_DATA) + { + if(a_http->res_code==200 || a_http->res_code==206 || a_http->res_code==0) + { + if(session_info->prot_flag==HTTP_CONTENT || session_info->prot_flag==HTTP_UNGZIP_CONTENT) + { + return_stat = frgmnt_av_sendback(frag,(char*)session_info->buf,session_info->buflen,a_http,a_tcp,thread_seq); + } + } + else + { + frag->proc_stat = FRAG_PROC_STAT_SCAN; + return_stat = PROT_STATE_DROPME; + } + } + return return_stat; +} + +uchar FRAG_MONITOR_ENTRY(stSessionInfo* session_info, void **param,int thread_seq, struct streaminfo *a_tcp, void *a_packet) +{ + if(NULL==session_info) + { + return PROT_STATE_DROPME; + } + uchar return_stat = PROT_STATE_GIVEME; + http_infor* a_http = (http_infor *)(session_info->app_info); + frag_info_t* frag = (frag_info_t*)*param; + + /*other HTTP, do not proc*/ + if(a_http->method!=HTTP_METHOD_UNKNOWN&&a_http->method!=HTTP_METHOD_GET) + { + return PROT_STATE_DROPME; + } + + /*init*/ + if(frag==NULL) + { + if(init_frag(&frag, thread_seq) <0) + { + return PROT_STATE_DROPME; + } + *param = frag; + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[HTTP_SESSION], 0, FS_OP_ADD, 1); + } + + if(session_info->prot_flag!=HTTP_STATE && frag->proc_stat==FRAG_PROC_STAT_SCAN) + { + return_stat = frag_scan(session_info,frag,a_http,a_tcp,thread_seq); + if(PROT_STATE_DROPME==return_stat) goto DROP_SESSION; + if(frag->proc_stat==FRAG_PROC_STAT_HIT) + { + frag->proc_stat = FRAG_PROC_STAT_SEND; + } + } + + /*��ƬԪ��Ϣ�洢*/ + frag_save(session_info,frag,a_http,a_tcp,thread_seq); + + /*�ȴ�refer,��FD��־*/ + if(frag->proc_stat==FRAG_PROC_STAT_SEND) + { + /*��Ƭ���ܿ�*/ + if(NULL!=a_http->p_url + && DIR_C2S==a_http->curdir + && session_info->prot_flag==HTTP_STATE + && HTTP_DATA_BEGIN==a_http->http_state) + { + return_stat = frag_check_block(frag, a_http, a_tcp, a_packet, thread_seq); + if(PROT_STATE_DROPME==return_stat) goto DROP_SESSION; + } + } + + if(session_info->prot_flag!=HTTP_STATE && frag->proc_stat==FRAG_PROC_STAT_SEND) + { + /*���ݻش�*/ + return_stat = frag_sendback(session_info,frag,a_http,a_tcp,thread_seq); + if(PROT_STATE_DROPME==return_stat) goto DROP_SESSION; + } + +DROP_SESSION: + /*release*/ + if(return_stat==PROT_STATE_DROPME||session_info->session_state&SESSION_STATE_CLOSE) + { + /*ȷ��frag*/ + project_req_add_char(a_tcp, g_frag_prog_para.frag_project_id, 0); + + if(frag->proc_stat==FRAG_PROC_STAT_SEND) + { + if(frag->session_type==SESSION_FRGMNT_INDEX) + { + frgmnt_index_sendback(frag,a_http,a_tcp,thread_seq); + frag_write_to_log(FRAG_INDEX_LOG, frag, a_http, a_tcp, thread_seq); + } + /*��������״̬������kafka*/ + if(g_frag_prog_para.frag_report_switch) + { + frag_json_report(LOCAL_LOG_TYPE_FRAG_FIND_PROG,frag, a_http, a_tcp, thread_seq); + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[FRAG_PROG_TOPIC], 0, FS_OP_ADD, 1); + } + /*��ƬԤ�淢��kafka,��ʱֻ����Ƭ��*/ + if(g_frag_prog_para.frag_forecast_switch && 0!=frag->data_dir && a_http->p_url!=NULL && frag->hit_server_ip==0) + { + frag_json_report(LOCAL_LOG_TYPE_FRAG_SERVER_IP,frag, a_http, a_tcp, thread_seq); + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[FRAG_URL_TOPIC], 0, FS_OP_ADD, 1); + } + } + release_frag((frag_info_t*)*param, thread_seq); + FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[HTTP_SESSION_CLOSE], 0, FS_OP_ADD, 1); + *param = NULL; + } + return return_stat; +} + +int read_main_conf() +{ + char log_filename[MAX_PATH_LEN]={0}; + char table_info_filename [MAX_PATH_LEN]={0}; + char config_buff[MAX_PATH_LEN]={0}; + int log_level = 0; + + g_frag_prog_para.avconf_handle = wired_cfg_create("AV_CONF", "./avconf/av.conf"); + wired_cfg_init(g_frag_prog_para.avconf_handle); + + g_frag_prog_para.avcomconf_handle = wired_cfg_create("AV_COM", "./avconf/av_com.conf"); + wired_cfg_init(g_frag_prog_para.avcomconf_handle); + + //LOG + wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","LogSwitch",config_buff,sizeof(config_buff),"0"); + g_frag_prog_para.log_switch = (short)atoi(config_buff); + wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","LogLevel",config_buff,sizeof(config_buff),"30"); + log_level = (short)atoi(config_buff); + wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","LogPath",g_frag_prog_para.log_path,sizeof(config_buff),"./avlog/frag_monitor/"); + snprintf(log_filename, sizeof(log_filename), "%s/%s", g_frag_prog_para.log_path, "runtime.log"); + g_frag_prog_para.logger = MESA_create_runtime_log_handle(log_filename,log_level); + if(NULL==g_frag_prog_para.logger) + { + printf("MESA_create_runtime_log_handle error.\n"); + return -1; + } + + /*maat*/ +#if K_PROJECT + g_frag_prog_para.frag_url_table_id = Maat_table_register(g_AV_global_feather, "MM_GLOBAL_FRAG_INDEX_URL"); +#else + g_frag_prog_para.frag_url_table_id = Maat_table_register(g_AV_global_feather, "GLOBAL_FRAG_INDEX_URL"); +#endif + int value = 0; + g_frag_prog_para.fs_handle = FS_create_handle(); + wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","StatFile",config_buff,sizeof(config_buff),"./avlog/frag_monitor/frag_monitor_status.log"); + FS_set_para(g_frag_prog_para.fs_handle, OUTPUT_DEVICE, config_buff, strlen(config_buff)+1); + value = 1;//flush by date + FS_set_para(g_frag_prog_para.fs_handle, FLUSH_BY_DATE, &value, sizeof(value)); + value = 2;//append + FS_set_para(g_frag_prog_para.fs_handle, PRINT_MODE, &value, sizeof(value)); + wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","StatInterval",config_buff,sizeof(config_buff),"30"); + value = atoi(config_buff); + FS_set_para(g_frag_prog_para.fs_handle, STAT_CYCLE, &value, sizeof(value)); + wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","StatSwitch",config_buff,sizeof(config_buff),"1"); + value = atoi(config_buff); + FS_set_para(g_frag_prog_para.fs_handle, PRINT_TRIGGER, &value, sizeof(value)); + if(value) + { + /*���ܿؼ��һ��*/ + wired_cfg_read(g_frag_prog_para.avcomconf_handle,"COMMON","fs2_ip",config_buff,sizeof(config_buff),""); + FS_set_para(g_frag_prog_para.fs_handle, STATS_SERVER_IP, config_buff, strlen(config_buff)+1); + wired_cfg_read(g_frag_prog_para.avcomconf_handle,"COMMON","fs2_port",config_buff,sizeof(config_buff),"8125"); + unsigned short port = atoi(config_buff); + FS_set_para(g_frag_prog_para.fs_handle, STATS_SERVER_PORT, &port, sizeof(port)); + } + wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","FRAG_NAME",config_buff,sizeof(config_buff),"frag_monitor"); + FS_set_para(g_frag_prog_para.fs_handle, APP_NAME, config_buff, strlen(config_buff)+1); + value = 1; + FS_set_para(g_frag_prog_para.fs_handle, CREATE_THREAD, &value, sizeof(value)); + g_frag_prog_para.fs_id[HTTP_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "HTTP_links"); + g_frag_prog_para.fs_id[HLS_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "HLS_links"); + g_frag_prog_para.fs_id[OSMF_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "OSMF_links"); + g_frag_prog_para.fs_id[IOS_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "IPA_links"); + g_frag_prog_para.fs_id[ANDRIOD_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "APK_links"); + g_frag_prog_para.fs_id[APP_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "APP_links"); + g_frag_prog_para.fs_id[HIT_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "HIT_links"); + g_frag_prog_para.fs_id[URLDROP_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "URLDROP_links"); + g_frag_prog_para.fs_id[DROP_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "DROP_links"); + g_frag_prog_para.fs_id[FRAG_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "FRAG_links"); + g_frag_prog_para.fs_id[MAYBE_FRAG_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "MATBE_links"); + g_frag_prog_para.fs_id[WEIRD_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "WEIRD_links"); + g_frag_prog_para.fs_id[HLS_INDEX_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "HLS_index"); + g_frag_prog_para.fs_id[HLS_DATA_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "HLS_data"); + g_frag_prog_para.fs_id[OSMF_DATA_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "OSMF_data"); + g_frag_prog_para.fs_id[IOS_DATA_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "IPA_data"); + g_frag_prog_para.fs_id[ANDRIOD_DATA_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "APK_data"); + g_frag_prog_para.fs_id[APP_DATA_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "APP_data"); + g_frag_prog_para.fs_id[FRAG_DATA_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "FRAG_data"); + g_frag_prog_para.fs_id[MAYBE_FRAG_DATA_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "MATBE_data"); + g_frag_prog_para.fs_id[WEIRD_DATA_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "WEIRD_data"); + g_frag_prog_para.fs_id[HLS_DATA_PKTS] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "HLS_d_pkts"); + g_frag_prog_para.fs_id[HLS_DATA_BYTES] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "HLS_d_bytes"); + g_frag_prog_para.fs_id[OSMF_DATA_PKTS] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "OSMF_d_pkts"); + g_frag_prog_para.fs_id[OSMF_DATA_BYTES] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "OSMF_d_bytes"); + g_frag_prog_para.fs_id[IOS_DATA_PKTS] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "IPA_d_pkts"); + g_frag_prog_para.fs_id[IOS_DATA_BYTES] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "IPA_d_bytes"); + g_frag_prog_para.fs_id[ANDRIOD_DATA_PKTS] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "APK_d_pkts"); + g_frag_prog_para.fs_id[ANDRIOD_DATA_BYTES] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "APK_d_bytes"); + g_frag_prog_para.fs_id[APP_DATA_PKTS] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "APP_d_pkts"); + g_frag_prog_para.fs_id[APP_DATA_BYTES] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "APP_d_bytes"); + g_frag_prog_para.fs_id[FRAG_DATA_PKTS] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "FRAG_d_pkts"); + g_frag_prog_para.fs_id[FRAG_DATA_BYTES] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "FRAG_d_bytes"); + g_frag_prog_para.fs_id[MAYBE_FRAG_DATA_PKTS] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "MAYBE_d_pkts"); + g_frag_prog_para.fs_id[MAYBE_FRAG_DATA_BYTES] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "MAYBE_d_bytes"); + g_frag_prog_para.fs_id[WEIRD_DATA_PKTS] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "WEIRD_d_pkts"); + g_frag_prog_para.fs_id[WEIRD_DATA_BYTES] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "WEIRD_d_bytes"); + g_frag_prog_para.fs_id[INDEX_TOPIC] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "index_topic"); + g_frag_prog_para.fs_id[CNVG_TOPIC] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "cnvg_topic"); + g_frag_prog_para.fs_id[FRAG_PROG_TOPIC] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "prog_topic"); + g_frag_prog_para.fs_id[FRAG_URL_TOPIC] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "url_topic"); + g_frag_prog_para.fs_id[URL_META] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "url_meta"); + g_frag_prog_para.fs_id[KEY_META] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "key_meta"); + g_frag_prog_para.fs_id[HTTP_SESSION_CLOSE] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "HTTP_close"); + g_frag_prog_para.fs_id[INDEX_GZIP] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "index_gzip"); + FS_start(g_frag_prog_para.fs_handle); + + /*media_hash*/ + uint32_t media_hash_size = 0; + uint32_t media_hash_max_elem_num = 0; + uint32_t media_hash_expire_time = 0; + wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","MediaHashSize",config_buff,sizeof(config_buff),"65536"); + media_hash_size = atoi(config_buff); + wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","MediaHashElemNum",config_buff,sizeof(config_buff),"1048576"); + media_hash_max_elem_num = atoi(config_buff); + wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","MediaHashExpireTime",config_buff,sizeof(config_buff),"120"); + media_hash_expire_time = atoi(config_buff); + MESA_htable_create_args_t hash_args; + memset(&hash_args,0,sizeof(MESA_htable_create_args_t)); + hash_args.thread_safe = 1; //group lock + hash_args.recursive = 1; + hash_args.hash_slot_size = media_hash_size; + hash_args.max_elem_num = media_hash_max_elem_num; + hash_args.eliminate_type = HASH_ELIMINATE_ALGO_LRU; + hash_args.expire_time = media_hash_expire_time; + hash_args.key_comp = NULL; + hash_args.key2index = NULL; + hash_args.data_free = free_media; + hash_args.data_expire_with_condition = expire_media_hash_node; + g_frag_prog_para.media_hash = MESA_htable_create(&hash_args, sizeof(hash_args)); + if(NULL==g_frag_prog_para.media_hash) + { + printf("create media_hash error.\n"); + return -1; + } + + //SYSTEM + /*app switch*/ + wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","AppSwitch",config_buff,sizeof(config_buff),"0"); + g_frag_prog_para.app_switch = (short)atoi(config_buff); + wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","AppGzipFeatureSwitch",config_buff,sizeof(config_buff),"0"); + g_frag_prog_para.app_gzip_switch = (short)atoi(config_buff); + wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","OsmfDataFeatureSwitch",config_buff,sizeof(config_buff),"0"); + g_frag_prog_para.osmf_data_feature_switch = (short)atoi(config_buff); + wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","IPSwitch",config_buff,sizeof(config_buff),"0"); + g_frag_prog_para.ip_switch = (short)atoi(config_buff); + wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","S2cCacheSize",config_buff,sizeof(config_buff),"500000"); + g_frag_prog_para.s2c_cache_size = (uint32)atoi(config_buff); + wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","FragReportSwitch",config_buff,sizeof(config_buff),"0"); + g_frag_prog_para.frag_report_switch = (uint32)atoi(config_buff); + wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","FragForecastSwitch",config_buff,sizeof(config_buff),"0"); + g_frag_prog_para.frag_forecast_switch = (uint32)atoi(config_buff); + + /*maat*/ + wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","MaatTableInfo",table_info_filename,sizeof(table_info_filename),"./avconf/frag_monitor/table_info.conf"); + wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","FragMonitorJSON",config_buff,sizeof(config_buff),"./avconf/frag_monitor/frag_monitor.json"); + int scan_detail = 0; + g_frag_prog_para.feather = Maat_feather(g_iThreadNum, table_info_filename, g_frag_prog_para.logger); + Maat_set_feather_opt(g_frag_prog_para.feather, MAAT_OPT_JSON_FILE_PATH, config_buff, strlen(config_buff)+1); + wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","MaatStatFile",config_buff,sizeof(config_buff),"./avlog/frag_monitor/maat_stat.log"); + Maat_set_feather_opt(g_frag_prog_para.feather, MAAT_OPT_STAT_FILE_PATH, config_buff, strlen(config_buff)+1); + int temp = 0; + wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","MaatStatSwitch",config_buff,sizeof(config_buff),"0"); + temp = atoi(config_buff); + if(temp) + { + Maat_set_feather_opt(g_frag_prog_para.feather, MAAT_OPT_STAT_ON, NULL, 0); + } + wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","MaatPerfSwitch",config_buff,sizeof(config_buff),"0"); + temp = atoi(config_buff); + if(temp) + { + Maat_set_feather_opt(g_frag_prog_para.feather, MAAT_OPT_PERF_ON, NULL, 0); + } + Maat_set_feather_opt(g_frag_prog_para.feather, MAAT_OPT_SCAN_DETAIL, &scan_detail, sizeof(scan_detail)); + Maat_initiate_feather(g_frag_prog_para.feather); + if(NULL==g_frag_prog_para.feather) + { + printf("Maat_summon_feather_json error.\n"); + return -1; + } + g_frag_prog_para.expr_tableid = Maat_table_register(g_frag_prog_para.feather,"FRAG_MONITOR_KEYWORDS"); + g_frag_prog_para.ip_tableid = Maat_table_register(g_frag_prog_para.feather,"FRAG_MONITOR_IP"); + + //��ƬԤ�� + g_frag_prog_para.frag_serverip_tableid = Maat_table_register(g_AV_global_feather,"FRAG_SERVER_IP"); + + //NETWORK + //get locat IP + wired_cfg_read(g_frag_prog_para.avconf_handle,"Module","SendBack_Device",config_buff,sizeof(config_buff),"mg0"); + g_frag_prog_para.local_ip_nr = get_ip_by_ifname(config_buff); + if(g_frag_prog_para.local_ip_nr==INADDR_NONE) + { + printf("get [NETWORK]LocalIP error.\n"); + return -1; + } + + // index Kafka��ʼ��// + /* + wired_cfg_read(g_frag_prog_para.avcomconf_handle,"COMMON","FD_TYPE2_KAFKA_BROKERS",g_frag_prog_para.index_brokers, sizeof(g_frag_prog_para.index_brokers),"0"); + g_frag_prog_para.index_kafka_producer = new KafkaProducer(g_frag_prog_para.index_brokers); + if(NULL==g_frag_prog_para.index_kafka_producer) + { + printf("KafkaProducer error.\n"); + return -1; + } + if(0!=g_frag_prog_para.index_kafka_producer->KafkaConnection()) + { + printf("KafkaConnection %s error.\n", g_frag_prog_para.index_brokers); + MESA_handle_runtime_log(g_frag_prog_para.logger, RLOG_LV_FATAL, FRGMNT_PLUGIN_NAME, + "{%s:%d} KafkaConnection %s error.", + __FILE__,__LINE__, g_frag_prog_para.index_brokers); + } + else + { + printf("KafkaConnection %s succ.\n", g_frag_prog_para.index_brokers); + MESA_handle_runtime_log(g_frag_prog_para.logger, RLOG_LV_FATAL, FRGMNT_PLUGIN_NAME, + "{%s:%d} KafkaConnection %s succ.", + __FILE__,__LINE__, g_frag_prog_para.index_brokers); + } + */ + + //TRACE + wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","TraceConfFile",config_buff, sizeof(config_buff),"./avconf/frag_monitor/addr_trace.conf"); + g_frag_prog_para.addr_trace = MESA_trace_create_addr_handle(g_frag_prog_para.logger, config_buff); + wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","TraceLogFile",g_frag_prog_para.trace_dir, sizeof(g_frag_prog_para.trace_dir),"./avlog/frag_monitor/"); + + return 0; +} + +extern "C" int FRAG_MONITOR_INIT() +{ + if(-1==read_main_conf()) + { + return -1; + } + + /*kafka*/ + if(NULL!=g_multi_kafka_producer) + { + /* + if((g_multi_kafka_producer->CreateTopicHandle(TOPIC_MEDIA_INDEX_DATA)) == NULL) + { + printf("Kafka CreateTopicHandle %s failed.", TOPIC_MEDIA_INDEX_DATA); + return -1; + } + if((g_multi_kafka_producer->CreateTopicHandle(TOPIC_MEDIA_CONVERGE_DATA)) == NULL) + { + printf("Kafka CreateTopicHandle %s failed.", TOPIC_MEDIA_CONVERGE_DATA); + return -1; + } + */ + rd_kafka_topic_conf_t* frag_topic_conf = rd_kafka_topic_conf_new(); + g_frag_prog_para.frag_topic_rkt[MEDIA_INDEX_TOPIC_RKT] = rd_kafka_topic_new(g_multi_kafka_producer, TOPIC_MEDIA_INDEX_DATA, frag_topic_conf); + frag_topic_conf = rd_kafka_topic_conf_new(); + g_frag_prog_para.frag_topic_rkt[MEDIA_CNVG_TOPIC_RKT] = rd_kafka_topic_new(g_multi_kafka_producer, TOPIC_MEDIA_CONVERGE_DATA, frag_topic_conf); + } + g_frag_prog_para.frag_project_id = project_producer_register("FRAG", PROJECT_VAL_TYPE_CHAR, NULL); + + return 0; +} + +extern "C" void FRAG_MONITOR_DESTROY(void) +{ + +} + + |
