//author: lishu //date: 2015-9-23 #include #include #include #include #include #include #include #include #include #include #include #include "MESA_handle_logger.h" #include "MESA_htable.h" #include "MESA_prof_load.h" #include "MESA_trace.h" #include "Maat_rule.h" #include "wired_cfg.h" #include "field_stat2.h" #include "http.h" #include "AV_feedback.h" #include "AV_charater.h" #include "AV_prog_table.h" #include "AV_log.h" #include "AV_kill_connection.h" #include "cJSON.h" #include "KafkaProducer.h" #include "opt.h" #include "opt_in.h" #include "optregister.h" #include "frag_monitor.h" #include "frag_common.h" #include "frag_block.h" int FRAG_MONITOR_VERSION_2_0_20191126 = 0; void frag_monitor_history() { //2015.09.23 create the project //2016.05.04 pid add tcp seq //2016.05.09 add send switch, add static //2016.05.11 add mediainfo layer_addr //2016.05.13 proc_frgmnt_hit log //2016.08.16 content-type sendback //2016.08.26 内容特征域多线程下载识别 //2016.11.02 302的内容不是音视频,不作处理 //2017.01.03 mesa_tcp lock, 使用field_state统计 //2017.02.21 support app //2017.02.24 add switch to close log //2017.03.02 cache index info and sendto kafka //2017.03.22 JSON: ISN ADDR_LIST PROTOCOL PROXY -1 to 48 bit //2017.04.07 1. log thread safe 2. media_type not protocol //2017.04.11 1. add topic stat //2017.04.12 1. tuple7 //2017.04.17 1. cJSON_print free //2017.04.25 1. HLS OSMF protocol as HTTP //2017.05.03 1. index gzip static //2017.05.15 1. OSMF feature change to f4f 2. hls tpl //2017.05.24 1. frag_fd //2017.07.13 1. add similar scan //2017.09.06 1. delete c2s_content 2. time //2017.09.11 1. reconstruction //2017.09.23 1. test //2017.09.25 1. MEDIA_CNVG_TOPIC 2. delete .inf 3.hls_data_s2c //2017.09.29 1. change index->data when double dir 2. frag_monitor.conf 3. frag_report_switch //2017.11.14 1. add frag_flag in http session //2017.11.30 1. add ts_data feature //2017.12.08 1. modify the method that tuple7 generate pid //2018.01.12 1. field_stat monitor 2. add url_key opt when send meta //2018.01.29 1. use kafka_handle defined by av_master //2018.03.15 1. support other type //2018.03.16 1. add frag notice //2018.05.09 1. add frag_json_report, addr+scan server_ip //2018.07.10 1. 1. refer send to kafka 2. close frag forecast //2018.07.24 1. 1. create maat feather //2018.11.12 1. 1. set K_PROJECT because different maat table //2019.08.02 1. 1. maat service_defined //2019.11.26 send single_key when double-way to cpz and kafka } frag_monitor_runtime_parameter_t g_frag_prog_para; /*sapp threadnum*/ static int g_iThreadNum = 1; /*av_master*/ //extern KafkaProducer* g_multi_kafka_producer; extern rd_kafka_t* g_multi_kafka_producer; /*av master*/ extern Maat_feather_t g_AV_global_feather; const char g_app_feature[4] = {0X50, 0X4B, 0X03, 0X04}; const char g_osmf_feature[4] = {0X6D, 0x64, 0x61, 0x74}; const char* g_log_des[FRAG_LOG_NUM] = { "index", "rssb_meta", "rssb_zero" }; const char* hash_eliminate_type[3] = { "", "ELIMINATE_TYPE_NUM", "ELIMINATE_TYPE_TIME" }; void frag_write_to_log(int type, frag_info_t* frag, http_infor* a_http,struct streaminfo *a_tcp, int thread_seq) { if(!g_frag_prog_para.log_switch) return; char buf[2048] = {0}; int buflen = 0; time_t cur_time; struct tm now; char now_time[32] = {0}; char day_time[32] = {0}; char filename[MAX_PATH_LEN] = {0}; time(&cur_time); localtime_r(&cur_time, &now); strftime(now_time, sizeof(now_time), "%Y-%m-%d %H:%M:%S", &now); /*init*/ if(NULL==g_frag_prog_para.fraginfo_file[thread_seq]) { strftime(day_time, sizeof(day_time), "%Y-%m-%d", &now); snprintf(filename, sizeof(filename), "%s/%s_%d.%s", g_frag_prog_para.log_path, "fraginfo" ,thread_seq,day_time); g_frag_prog_para.fraginfo_file[thread_seq] = fopen(filename, "a+"); localtime_r(&cur_time, &g_frag_prog_para.fraginfo_filetime[thread_seq]); } if(now.tm_mday!=g_frag_prog_para.fraginfo_filetime[thread_seq].tm_mday || now.tm_mon!=g_frag_prog_para.fraginfo_filetime[thread_seq].tm_mon || now.tm_year!=g_frag_prog_para.fraginfo_filetime[thread_seq].tm_year) { if(g_frag_prog_para.fraginfo_file[thread_seq]) { strftime(day_time, sizeof(day_time), "%Y-%m-%d", &now); snprintf(filename, sizeof(filename), "%s/%s_%d.%s", g_frag_prog_para.log_path, "fraginfo" ,thread_seq,day_time); fclose(g_frag_prog_para.fraginfo_file[thread_seq]); g_frag_prog_para.fraginfo_file[thread_seq] = fopen(filename, "a+"); } localtime_r(&cur_time, &g_frag_prog_para.fraginfo_filetime[thread_seq]); } if(NULL!=frag->frag_opt[FRAG_URL].opt_value) { if(NULL!=frag->frag_opt[FRAG_S2C_CONTENT_TYPE].opt_value) { buflen = snprintf(buf, sizeof(buf), "%20s%10s dir:%1d PID:%20llu configID:%5d media_type:%5hhu service_id:%5u session_type:%5u addr_list:%20s url:%s s2c_content_type:%s\n", now_time, g_log_des[type], a_tcp->dir, frag->tuple7_id, frag->config_id, frag->media_type, frag->service_id, frag->session_type, frag->tuple7, frag->frag_opt[FRAG_URL].opt_value,frag->frag_opt[FRAG_S2C_CONTENT_TYPE].opt_value); } else { buflen = snprintf(buf, sizeof(buf), "%20s%10s dir:%1d PID:%20llu configID:%5d media_type:%5hhu service_id:%5u session_type:%5u addr_list:%20s url:%s\n", now_time, g_log_des[type], a_tcp->dir, frag->tuple7_id, frag->config_id, frag->media_type, frag->service_id, frag->session_type, frag->tuple7, frag->frag_opt[FRAG_URL].opt_value); } } else { if(NULL!=frag->frag_opt[FRAG_S2C_CONTENT_TYPE].opt_value) { buflen = snprintf(buf, sizeof(buf), "%20s%10s dir:%1d PID:%20llu configID:%5d media_type:%5hhu service_id:%5u session_type:%5u addr_list:%20s s2c_content_type:%s\n", now_time, g_log_des[type], a_tcp->dir, frag->tuple7_id, frag->config_id, frag->media_type, frag->service_id, frag->session_type, frag->tuple7, frag->frag_opt[FRAG_S2C_CONTENT_TYPE].opt_value); } else { buflen = snprintf(buf, sizeof(buf), "%20s%10s dir:%1d PID:%20llu configID:%5d media_type:%5hhu service_id:%5u session_type:%5u addr_list:%20s\n", now_time, g_log_des[type], a_tcp->dir, frag->tuple7_id, frag->config_id, frag->media_type, frag->service_id, frag->session_type, frag->tuple7); } } if(NULL!=g_frag_prog_para.fraginfo_file[thread_seq]) { fwrite(buf, strlen(buf),1, g_frag_prog_para.fraginfo_file[thread_seq]); fflush(g_frag_prog_para.fraginfo_file[thread_seq]); } } void index_local_log(const char* filename, char* buf, uint32 buflen) { FILE *pFile; if ((pFile = fopen(filename, "a"))!=NULL) { fwrite(buf, buflen, 1, pFile); fclose(pFile); } } void frag_json_report(int local_log_type, frag_info_t* frag, http_infor* a_http,struct streaminfo *a_tcp, int thread_seq) { AV_info_t p; opt_unit_t opt[2]; int opt_num = 0; char query_url_key[1024] = {0}; int gen_key = 0; p.a_stream = a_tcp; p.pid = frag->tuple7_id; if(frag->frag_opt[FRAG_URL].opt_value!=NULL) { p.url = frag->frag_opt[FRAG_URL].opt_value; } else { p.url = NULL; } p.length = a_http->cont_length; p.protocol = frag->proto; /*单向流对准key,20191126,无论单向流还是双向流都发送该字段*/ gen_key = append_http_query_key(a_tcp, a_http->http_session_seq, query_url_key, sizeof(query_url_key)); if(gen_key >= 0) { opt[opt_num].opt_len = strlen(query_url_key); opt[opt_num].opt_type = META_OPT_SINGLE_KEY; opt[opt_num].opt_value = query_url_key; opt_num ++; } /*referer*/ if(frag->frag_opt[FRAG_REFERER].opt_value!=NULL) { opt[opt_num].opt_len = frag->frag_opt[FRAG_REFERER].opt_len; opt[opt_num].opt_type = META_OPT_REFERER; opt[opt_num].opt_value = frag->frag_opt[FRAG_REFERER].opt_value; opt_num ++; } AV_local_log_convert_json(&p,local_log_type, opt,opt_num); } //void frgmnt_index_sendback_more(frag_info_t* frag, http_infor* a_http, struct streaminfo *a_tcp, int thread_seq) void frgmnt_index_sendback(frag_info_t* frag, http_infor* a_http, struct streaminfo *a_tcp, int thread_seq) { cJSON* root; char* out = NULL; uint32_t out_len = 0; char ipbuf[32] = {0}; int ipbuf_len = 32; string topic_name; int cb_ret = 0; char pid_buf[64] = {0}; root = cJSON_CreateObject(); /*topic*/ if(0==frag->data_dir) { cJSON_AddStringToObject(root, "topic", TOPIC_MEDIA_INDEX_DATA); topic_name = TOPIC_MEDIA_INDEX_DATA; FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[INDEX_TOPIC], 0, FS_OP_ADD, 1); } else { cJSON_AddStringToObject(root, "topic",TOPIC_MEDIA_CONVERGE_DATA); topic_name = TOPIC_MEDIA_CONVERGE_DATA; FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[CNVG_TOPIC], 0, FS_OP_ADD, 1); } /*service_id*/ cJSON_AddNumberToObject(root, "service_id", frag->service_id); /*capIP*/ inet_ntop(AF_INET, &g_frag_prog_para.local_ip_nr, ipbuf, ipbuf_len); cJSON_AddStringToObject(root, "cap_ip", ipbuf); /*tuple7=pid !!!!PID以数字加入JSON会错误*/ snprintf(pid_buf, sizeof(pid_buf), "%llu", frag->tuple7_id); cJSON_AddStringToObject(root, "pid", pid_buf); /*dir*/ cJSON_AddNumberToObject(root, "dir", a_tcp->dir); /*curdir*/ cJSON_AddNumberToObject(root, "curdir", a_http->curdir); /*url*/ if(NULL!=frag->frag_opt[FRAG_URL].opt_value) { cJSON_AddStringToObject(root, "url", frag->frag_opt[FRAG_URL].opt_value); } /*ua*/ if(NULL!=frag->frag_opt[FRAG_UA].opt_value) { cJSON_AddStringToObject(root, "user_agent", frag->frag_opt[FRAG_UA].opt_value); } /*referer*/ if(NULL!=frag->frag_opt[FRAG_REFERER].opt_value) { cJSON_AddStringToObject(root, "referer", frag->frag_opt[FRAG_REFERER].opt_value); } /*s2c_content*/ if(NULL!=frag->frag_opt[FRAG_S2C_CONTENT].opt_value) { frag->frag_opt[FRAG_S2C_CONTENT].opt_value = (char*)dictator_realloc(thread_seq, frag->frag_opt[FRAG_S2C_CONTENT].opt_value, frag->frag_opt[FRAG_S2C_CONTENT].opt_len+1); frag->frag_opt[FRAG_S2C_CONTENT].opt_value[frag->frag_opt[FRAG_S2C_CONTENT].opt_len] = '\0'; frag->frag_opt[FRAG_S2C_CONTENT].opt_len += 1; cJSON_AddStringToObject(root, "s2c_content", frag->frag_opt[FRAG_S2C_CONTENT].opt_value); } out = cJSON_Print(root); out_len = strlen(out); if(MESA_trace_match_addr(g_frag_prog_para.addr_trace, (trace_layer_addr*)(&a_tcp->addr))) { const char* addr = NULL; char filename[MAX_PATH_LEN] = {0}; addr = printaddr(&a_tcp->addr, thread_seq); snprintf(filename,sizeof(filename),"%s/%s,%u", g_frag_prog_para.trace_dir,addr, a_http->http_session_seq); index_local_log(filename, out, out_len); } /*kfaka*/ #if K_PROJECT if(NULL!=g_multi_kafka_producer) { cb_ret = rd_kafka_produce(g_frag_prog_para.frag_topic_rkt[MEDIA_INDEX_TOPIC_RKT], RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, out, out_len, NULL, 0, NULL); } #else if(NULL!=g_frag_prog_para.index_kafka_producer) { cb_ret = g_frag_prog_para.index_kafka_producer->SendData(topic_name, (void *)out, (size_t)out_len); } #endif if(out) { free(out); } cJSON_Delete(root); } /* void frgmnt_index_sendback(frag_info_t* frag, http_infor* a_http, struct streaminfo *a_tcp, int thread_seq) { for(int i=0;i<10000;i++) { frgmnt_index_sendback_more(frag,a_http,a_tcp,thread_seq); } } */ void create_media_opt(media_opt_info_t** media_optinfo, uchar opt_type, char* opt_value, uint32 opt_len, int thread_seq) { media_opt_info_t* node = *media_optinfo; if(node==NULL) { node = (media_opt_info_t*)dictator_malloc(thread_seq, sizeof(media_opt_info_t)); node->opt = NULL; node->opt_content_id = -1; node->opt_num = 0; } int opt_num = node->opt_num; node->opt = (opt_unit_t*)dictator_realloc(thread_seq, (void*)node->opt, sizeof(struct opt_unit_t)*(1+opt_num)); memset((void*)&node->opt[opt_num],0,sizeof(struct opt_unit_t)); node->opt[opt_num].opt_value = (char*)dictator_malloc(thread_seq, opt_len); memcpy(node->opt[opt_num].opt_value,opt_value,opt_len); node->opt[opt_num].opt_type = opt_type; node->opt[opt_num].opt_len = opt_len; node->opt_num += 1; (*media_optinfo) = node; } void destroy_media_opt(media_opt_info_t** media_optinfo, int thread_seq) { if((*media_optinfo)->opt!=NULL) { for(int i=0;i<(*media_optinfo)->opt_num;i++) { if((*media_optinfo)->opt[i].opt_value!=NULL) { dictator_free(thread_seq, (*media_optinfo)->opt[i].opt_value); (*media_optinfo)->opt[i].opt_value = NULL; } } dictator_free(thread_seq, (*media_optinfo)->opt); (*media_optinfo)->opt = NULL; } dictator_free(thread_seq, *media_optinfo); *media_optinfo = NULL; } void append_media_opt(media_opt_info_t* media_optinfo, uchar opt_id, char* opt_value, uint32 opt_len, int thread_seq) { (media_optinfo)->opt[opt_id].opt_value = (char*)dictator_realloc(thread_seq, (media_optinfo)->opt[opt_id].opt_value, (media_optinfo)->opt[opt_id].opt_len+opt_len); memcpy((media_optinfo)->opt[opt_id].opt_value+(media_optinfo)->opt[opt_id].opt_len,opt_value,opt_len); (media_optinfo)->opt[opt_id].opt_len += opt_len; } int ts_data_check(char* data, uint32 data_len) { return 0; } uchar frgmnt_av_sendback(frag_info_t* frag, char* data, uint32 data_len, http_infor* a_http, struct streaminfo *a_tcp, int thread_seq) { char DST = AV_DST_NORMAL; char query_url_key[1024] = {0}; int gen_key = 0; char serviceid[256] = {0}; snprintf(serviceid,sizeof(serviceid),"%d",frag->service_id); /*特别的,HLS .ts的数据会话,第一个字节为0x47*/ /*不需要判断range,碎片化不会再分线程*/ /*不需要考虑lostlen, lost之后无法解码,直接不回传*/ if(frag->service_id==SERVICE_HLS_DATA && frag->cont_offset==0) { if(*data!=0x47) { return PROT_STATE_DROPME; } } /*确认frag*/ project_req_add_char(a_tcp, g_frag_prog_para.frag_project_id, 1); /*select DST*/ switch(frag->media_type) { case FILE_IOS: case FILE_ANDRIOD: case FILE_APP: DST = AV_DST_APP; break; default: DST = AV_DST_FRAGMENT_QUERY; break; } /*media_info*/ if(!(frag->send_meta_flag)) { frag->av_info = (av_info_t*)dictator_malloc(thread_seq, sizeof(av_info_t)); memset(frag->av_info,0,sizeof(av_info_t)); /*PID*/ frag->av_info->media_info.pid = frag->tuple7_id; /*protocol*/ frag->av_info->media_info.protocol = frag->proto; /*media_type*/ frag->av_info->media_info.media_type = frag->media_type; /*media_len*/ if(a_http->cont_range!=NULL) { frag->av_info->media_info.prog_len = a_http->cont_range->len; frag->cont_offset = a_http->cont_range->start; } else if(a_http->cont_length!=0) { frag->av_info->media_info.prog_len = a_http->cont_length; } else { frag->av_info->media_info.prog_len = MAX_UINT64_VAL; } /*serviceID*/ if(0!=frag->service_id) { create_media_opt(&(frag->av_info->media_optinfo),META_OPT_SERVICE_ID, serviceid, strlen(serviceid),thread_seq); } /*addr*/ create_media_opt(&(frag->av_info->media_optinfo),META_OPT_LAYER_ADDR, (char*)(&a_tcp->addr), sizeof(a_tcp->addr), thread_seq); /*url*/ if(NULL!=frag->frag_opt[FRAG_URL].opt_value) { create_media_opt(&(frag->av_info->media_optinfo),META_OPT_TYPE_URL, frag->frag_opt[FRAG_URL].opt_value, frag->frag_opt[FRAG_URL].opt_len-1,thread_seq); FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[URL_META], 0, FS_OP_ADD, 1); } /*单向流对准key,20191126,无论单向流还是双向流都发送该字段*/ gen_key = append_http_query_key(a_tcp, a_http->http_session_seq, query_url_key, sizeof(query_url_key)); if(gen_key >= 0) { create_media_opt(&(frag->av_info->media_optinfo), META_OPT_SINGLE_KEY, query_url_key, strlen(query_url_key),thread_seq); FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[KEY_META], 0, FS_OP_ADD, 1); } /*ua*/ if(NULL!=frag->frag_opt[FRAG_UA].opt_value) { create_media_opt(&(frag->av_info->media_optinfo),META_OPT_USER_AGENT, frag->frag_opt[FRAG_UA].opt_value, frag->frag_opt[FRAG_UA].opt_len-1,thread_seq); } /*referer*/ if(NULL!=frag->frag_opt[FRAG_REFERER].opt_value) { create_media_opt(&(frag->av_info->media_optinfo),META_OPT_REFERER, frag->frag_opt[FRAG_REFERER].opt_value, frag->frag_opt[FRAG_REFERER].opt_len-1,thread_seq); } if(NULL!=frag->av_info->media_optinfo) { AV_send_media_info_to(DST, &(frag->av_info->media_info),frag->av_info->media_optinfo->opt,frag->av_info->media_optinfo->opt_num,thread_seq); } else { AV_send_media_info_to(DST, &(frag->av_info->media_info),NULL,0,thread_seq); } frag_write_to_log(FRAG_META_LOG, frag, a_http, a_tcp, thread_seq); frag->send_meta_flag = 1; /*有数据才发元信息*/ frag_write_to_log(FRAG_ZERO_LOG, frag, a_http, a_tcp,thread_seq); } AV_send_data_to(DST, frag->av_info->media_info.pid, frag->cont_offset, (const char*)data, data_len, frag->av_info->media_info.protocol, thread_seq, (const struct layer_addr*)(&a_tcp->addr)); frag->cont_offset += data_len; /*stat*/ switch(frag->media_type) { case FILE_OSMF: FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[OSMF_DATA_PKTS], 0, FS_OP_ADD, 1); FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[OSMF_DATA_BYTES], 0, FS_OP_ADD, data_len); break; case FILE_HLS: FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[HLS_DATA_PKTS], 0, FS_OP_ADD, 1); FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[HLS_DATA_BYTES], 0, FS_OP_ADD, data_len); break; case FILE_IOS: FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[IOS_DATA_PKTS], 0, FS_OP_ADD, 1); FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[IOS_DATA_BYTES], 0, FS_OP_ADD, data_len); break; case FILE_ANDRIOD: FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[ANDRIOD_DATA_PKTS], 0, FS_OP_ADD, 1); FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[ANDRIOD_DATA_BYTES], 0, FS_OP_ADD, data_len); break; case FILE_APP: FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[APP_DATA_PKTS], 0, FS_OP_ADD, 1); FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[APP_DATA_BYTES], 0, FS_OP_ADD, data_len); break; case FILE_FRAG: FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[FRAG_DATA_PKTS], 0, FS_OP_ADD, 1); FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[FRAG_DATA_BYTES], 0, FS_OP_ADD, data_len); break; case FILE_MAYBE_FRAG: FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[MAYBE_FRAG_DATA_PKTS], 0, FS_OP_ADD, 1); FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[MAYBE_FRAG_DATA_BYTES], 0, FS_OP_ADD, data_len); break; default: FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[WEIRD_DATA_PKTS], 0, FS_OP_ADD, 1); FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[WEIRD_DATA_BYTES], 0, FS_OP_ADD, data_len); break; } return PROT_STATE_GIVEME; } int init_frag(frag_info_t **frag, int thread_seq) { frag_info_t* pme = (frag_info_t*)dictator_malloc(thread_seq, sizeof(frag_info_t)); memset(pme, 0 ,sizeof(frag_info_t)); *frag = pme; return 0; } int release_frag(frag_info_t *frag, int thread_seq) { if(frag == NULL) { return -1; } if(frag->mid != NULL) { Maat_clean_status(&(frag->mid)); frag->mid = NULL; } for(int i=0;ifrag_opt[i].opt_value) { dictator_free(thread_seq, frag->frag_opt[i].opt_value); } } if(frag->av_info!= NULL) { if(NULL!=frag->av_info->media_optinfo) { destroy_media_opt(&(frag->av_info->media_optinfo), thread_seq); frag->av_info->media_optinfo = NULL; } dictator_free(thread_seq,frag->av_info); } dictator_free(thread_seq, frag); return 0; } void free_media(void* data) { if(NULL!=data) { free(data); } } int expire_media_hash_node(void *data, int eliminate_type) { media_format_hnode_t* mdi = (media_format_hnode_t*)data; MESA_handle_runtime_log(g_frag_prog_para.logger, RLOG_LV_INFO, FRGMNT_PLUGIN_NAME, "{%s:%d} expire_media_hash_node %s: related_id: %llu, media_type:0x%02x]", __FILE__,__LINE__, hash_eliminate_type[eliminate_type], mdi->relate_id, mdi->media_type); return 1; } uint64 make_frag_pid(frag_info_t* frag, http_infor *a_http, struct streaminfo *a_tcp, int thread_seq) { uint32 tcp_seq = 0; uint32 tcp_seq_len = sizeof(uint32); uchar proxy_type = 0; int rec = 0; /*create tuple7_id*/ if((a_tcp->pfather != NULL) && (a_tcp->pfather->type == STREAM_TYPE_HTTP_PROXY)) { proxy_type = a_tcp->pfather->type; } rec = MESA_get_stream_opt(a_tcp, MSO_TCP_ISN_C2S, (void*)&tcp_seq, (int*)&tcp_seq_len); if(rec < 0) { tcp_seq = 0; } snprintf(frag->tuple7,sizeof(frag->tuple7),"%s,%u,%u,%u",printaddr(&a_tcp->addr,thread_seq),tcp_seq,proxy_type,a_http->http_session_seq); return AV_make_porg_id((unsigned char*)frag->tuple7, strlen(frag->tuple7), PID_TYPE_RESORT); } /*compile值越大,优先级别越高*/ int select_maat_result(struct Maat_rule_t maat_result[],int result_num) { int best_idx = 0; int max_config = maat_result[0].config_id; for(int i=1;iaddr), 6, &result, 1, &mid, a_tcp->threadnum); Maat_clean_status(&mid); if(ret > 0) { return 1; } return 0; } uchar set_frgmnt_hit(struct Maat_rule_t maat_result[], int result_num,frag_info_t* frag, http_infor *a_http, struct streaminfo *a_tcp, int thread_seq) { scan_status_t mid = NULL; int idx = select_maat_result(maat_result, result_num); if(NULL!=maat_result) { frag->config_id = maat_result[idx].config_id; frag->service_id = maat_result[idx].service_id; frag->proto = AV_PROTOCOL_HTTP; frag->media_type = maat_result[idx].do_blacklist; /*数据发往index 或者rssb*/ frag->session_type = maat_result[idx].action; /*0:索引信息 1/2:码流信息*/ frag->data_dir = maat_result[idx].do_log; } /*C2S测命中且IP命中FRAG_SERVER_IP,需要发碎片预告,索引的和数据的都要发*/ /*S2C测命中且IP命中FRAG_SERVER_IP, 认为是碎片*/ if(g_frag_prog_para.frag_forecast_switch && 0!=frag->data_dir) { if(is_frag_server_ip(a_tcp)) { frag->hit_server_ip = 1; } if(frag->media_type==FILE_MAYBE_FRAG) { /*FILE_FRAG_FORECAST是解决单向流的情况,由S2C应答侧识别的,不能存在URL*/ FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[HIT_SESSION], 0, FS_OP_ADD, 1); if(a_http->p_url!=NULL) { FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[URLDROP_SESSION], 0, FS_OP_ADD, 1); return PROT_STATE_DROPME; } if(frag->hit_server_ip==0) { FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[DROP_SESSION], 0, FS_OP_ADD, 1); return PROT_STATE_DROPME; } } } frag->proc_stat = FRAG_PROC_STAT_HIT; frag->tuple7_id = make_frag_pid(frag, a_http, a_tcp, thread_seq); switch(frag->media_type) { case FILE_OSMF: FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[OSMF_SESSION], 0, FS_OP_ADD, 1); break; case FILE_HLS: FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[HLS_SESSION], 0, FS_OP_ADD, 1); break; case FILE_IOS: FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[IOS_SESSION], 0, FS_OP_ADD, 1); break; case FILE_ANDRIOD: FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[ANDRIOD_SESSION], 0, FS_OP_ADD, 1); break; case FILE_APP: FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[APP_SESSION], 0, FS_OP_ADD, 1); break; case FILE_FRAG: FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[FRAG_SESSION], 0, FS_OP_ADD, 1); break; case FILE_MAYBE_FRAG: FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[MAYBE_FRAG_SESSION], 0, FS_OP_ADD, 1); break; default: FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[WEIRD_SESSION], 0, FS_OP_ADD, 1); break; } if(frag->session_type==SESSION_FRGMNT_INDEX) { switch(frag->media_type) { case FILE_HLS: FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[HLS_INDEX_SESSION], 0, FS_OP_ADD, 1); if(a_http->cont_encoding==HTTP_CONT_ENCOD_GZIP) { FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[INDEX_GZIP], 0, FS_OP_ADD, 1); } break; default: break; } } else { switch(frag->media_type) { case FILE_OSMF: FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[OSMF_DATA_SESSION], 0, FS_OP_ADD, 1); break; case FILE_HLS: FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[HLS_DATA_SESSION], 0, FS_OP_ADD, 1); break; case FILE_IOS: FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[IOS_DATA_SESSION], 0, FS_OP_ADD, 1); break; case FILE_ANDRIOD: FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[ANDRIOD_DATA_SESSION], 0, FS_OP_ADD, 1); break; case FILE_APP: FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[APP_DATA_SESSION], 0, FS_OP_ADD, 1); break; case FILE_FRAG: FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[FRAG_DATA_SESSION], 0, FS_OP_ADD, 1); break; case FILE_MAYBE_FRAG: FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[MAYBE_FRAG_DATA_SESSION], 0, FS_OP_ADD, 1); break; default: FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[WEIRD_DATA_SESSION], 0, FS_OP_ADD, 1); break; } } } int set_osmf_hit(frag_info_t *frag, http_infor *a_http, struct streaminfo *a_tcp, int thread_seq) { frag->config_id = 0; frag->service_id = SERVICE_OSMF_DATA; frag->proto = AV_PROTOCOL_HTTP; frag->media_type = FILE_OSMF; frag->session_type = SESSION_FRGMNT_DATA; frag->data_dir = FRAG_TYPE_DATA_S2C; frag->tuple7_id = make_frag_pid(frag, a_http, a_tcp, thread_seq); FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[OSMF_SESSION], 0, FS_OP_ADD, 1); return 1; } /* int set_hls_hit(frag_info_t *frag, http_infor *a_http, struct streaminfo *a_tcp, int thread_seq) { frag->config_id = 0; frag->service_id = SERVICE_HLS_DATA; frag->proto = AV_PROTOCOL_HTTP; frag->media_type = FILE_HLS; frag->session_type = SESSION_FRGMNT_DATA; frag->data_dir = FRAG_TYPE_DATA_S2C; frag->tuple7_id = make_frag_pid(frag, a_http, a_tcp, thread_seq); FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[OSMF_SESSION], 0, FS_OP_ADD, 1); return 1; } */ int set_app_hit(frag_info_t *frag, http_infor *a_http, struct streaminfo *a_tcp, int thread_seq) { frag->config_id = 0; frag->service_id = SERVICE_APP; frag->proto = AV_PROTOCOL_HTTP; frag->media_type = FILE_APP; frag->session_type = SESSION_FRGMNT_UNKNOW; frag->data_dir = FRAG_TYPE_DATA_S2C; frag->tuple7_id = make_frag_pid(frag, a_http, a_tcp, thread_seq); FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[APP_SESSION], 0, FS_OP_ADD, 1); return 1; } static int url_indicated_app(const char *url) { const char *pfilename = NULL, *p1 = NULL, *p2 = NULL, *p3 = NULL, *p = NULL; unsigned int len=0; if(url == NULL) return 0; pfilename = strrchr(url, '/'); if(pfilename == NULL || *(pfilename + 1) == '\0') return 0; pfilename = pfilename + 1; len = strlen(pfilename); p1 = strchr(pfilename, ';'); p2 = strchr(pfilename, '?'); p3 = strchr(pfilename, '#'); if(p1>p2) { p = p1; p1 = p2; p2 = p; } if(p1>p3) { p = p3; p1 = p3; p3 = p; } if(p2>p3) { p = p2; p2 = p3; p3 = p; } if(p1 != NULL) len = p1 - pfilename; else if(p2 != NULL) len = p2 - pfilename; else if(p3 != NULL) len = p3 - pfilename; if(!strncasecmp(pfilename + len - 4, ".apk", 4)) return FILE_ANDRIOD; if(!strncasecmp(pfilename + len - 4, ".ipa", 4)) return FILE_IOS; return 0; } void create_related_id(frag_info_t *frag, http_infor* a_http, struct streaminfo *a_tcp) { if(frag->related_id!=0) return; uint64 cont_len = a_http->cont_length; /*记录文件长度,用以生成related_id, 只处理下载S2C,并且优先以content_range的len为文件总长度*/ if(a_http->cont_range!=NULL) { cont_len = a_http->cont_range->len; } if(cont_len==0) return; struct layer_addr* addr = &a_tcp->addr; uchar pid_buf[PID_BUF_LEN] = {0}; if(addr->addrtype == ADDR_TYPE_IPV4) { struct tuple4* tuple4 = (struct tuple4 *)(addr->paddr); char ip4saddr[IPV4_ADDR_LEN] = {0}; char ip4daddr[IPV4_ADDR_LEN] = {0}; inet_ntop(AF_INET, &tuple4->saddr, ip4saddr, IPV4_ADDR_LEN); inet_ntop(AF_INET, &tuple4->daddr, ip4daddr, IPV4_ADDR_LEN); snprintf((char*)pid_buf, PID_BUF_LEN, "IPV4%s,%s,%llu", ip4saddr, ip4daddr, cont_len); } else if(addr->addrtype == ADDR_TYPE_IPV6) { struct tuple6* tuple6 = (struct tuple6 *)(addr->paddr); char ip6saddr[IPV6_ADDR_LEN] = {0}; char ip6daddr[IPV6_ADDR_LEN] = {0}; inet_ntop(AF_INET6, &tuple6->saddr, ip6saddr, IPV6_ADDR_LEN); inet_ntop(AF_INET6, &tuple6->daddr, ip6daddr, IPV6_ADDR_LEN); snprintf((char*)pid_buf, PID_BUF_LEN, "IPV6%s,%s,%llu", ip6saddr, ip6daddr, cont_len); } else { return; } frag->related_id = AV_make_porg_id(pid_buf, (unsigned int)strlen((const char*)pid_buf), PID_TYPE_RESORT); } long media_hash_search_cb(void *data, const uint8_t *key, uint size, void *user_arg) { frag_info_t* frag = (frag_info_t*)user_arg; if(NULL!=data) { frag->media_type = ((media_format_hnode_t*)data)->media_type; return 1; } return 0; } int frag_multithread_hash_search(const char *pHeader, int iHeaderLen, frag_info_t* frag, http_infor *a_http, struct streaminfo *a_tcp, int thread_seq) { if(!g_frag_prog_para.app_switch && !g_frag_prog_para.osmf_data_feature_switch) return 0; long rec_cb = 0; char* find_pos = NULL; media_format_hnode_t* media_format_hnode = NULL; create_related_id(frag, a_http, a_tcp); if(0!=frag->related_id) { /*多线程下载情况下,查询media_hash进行识别*/ MESA_htable_search_cb(g_frag_prog_para.media_hash, (const uint8_t *)&frag->related_id, sizeof(frag->related_id), media_hash_search_cb, (void*)frag, &rec_cb); if(rec_cb==0) { /*内容特征域匹配*/ if(g_frag_prog_para.app_switch && g_frag_prog_para.app_gzip_switch) { find_pos = (char*)memmem(pHeader, iHeaderLen, g_app_feature, sizeof(g_app_feature)); if(NULL!=find_pos) { frag->media_type = FILE_APP; } } if(g_frag_prog_para.osmf_data_feature_switch) { find_pos = (char*)memmem(pHeader, iHeaderLen, g_osmf_feature, sizeof(g_osmf_feature)); if(NULL!=find_pos) { frag->media_type = FILE_OSMF; } } if(NULL!=find_pos) { /*add prog_id , 普通多线程文件下载*/ media_format_hnode = (media_format_hnode_t*)malloc(sizeof(media_format_hnode_t)); media_format_hnode->media_type = frag->media_type; media_format_hnode->relate_id = frag->related_id; MESA_htable_add(g_frag_prog_para.media_hash, (const unsigned char*)&frag->related_id, sizeof(frag->related_id), (const void*)media_format_hnode); /*write log*/ MESA_handle_runtime_log(g_frag_prog_para.logger,RLOG_LV_INFO,FRGMNT_PLUGIN_NAME, (char*)"[%s:%d] media_format_indentify MESA_htable_add related_id:%llu, media_type:0x%02x, url:%s." , __FILE__,__LINE__, frag->related_id, frag->media_type, a_http->p_url); } } /*命中特征域*/ if(rec_cb || NULL!=find_pos) { switch(frag->media_type) { case FILE_OSMF: set_osmf_hit(frag, a_http, a_tcp, thread_seq); return 1; break; case FILE_APP: set_app_hit(frag, a_http, a_tcp, thread_seq); frag->session_type = SESSION_FRGMNT_DATA; return 1; break; default: break; } } } return 0; } char frag_scan(stSessionInfo* session_info, frag_info_t* frag, http_infor *a_http, struct streaminfo *a_tcp, int thread_seq) { int ret = 0; uchar return_value = PROT_STATE_GIVEME; const char* scan_data = NULL; uint32 scan_datalen = 0; const char* region_name = {0}; uint32 region_name_len = 0; int found_pos[MAAT_RESULT_NUM] = {0}; int hit_result = 0; struct Maat_rule_t maat_result[MAAT_RESULT_NUM]; char url[2048] = {0}; memset(&maat_result, 0, sizeof(maat_result)); /*scan preproc*/ switch(session_info->prot_flag) { case HTTP_CONTENT: case HTTP_UNGZIP_CONTENT: /*内容只扫描首包*/ if((session_info->prot_flag==HTTP_CONTENT||session_info->prot_flag==HTTP_UNGZIP_CONTENT) && a_http->curdir==DIR_S2C && frag->s2c_first_pkt==0) { frag->s2c_first_pkt = 1; scan_data = (char*)session_info->buf; scan_datalen = session_info->buflen; } break; case HTTP_CONT_LENGTH: case HTTP_CONT_RANGE: case HTTP_USER_AGENT: case HTTP_REFERER: case HTTP_COOKIE: case HTTP_HOST: /*不需要扫描*/ break; case HTTP_CONT_TYPE: if(a_http->curdir==DIR_S2C) { scan_data = (char*)session_info->buf; scan_datalen = session_info->buflen; } break; default: if(g_frag_prog_para.app_switch && session_info->prot_flag==HTTP_MESSAGE_URL) { memcpy(url, session_info->buf, MIN((int)session_info->buflen, (int)(sizeof(url)-1))); frag->media_type = url_indicated_app(url); if(frag->media_type) { hit_result = set_app_hit(frag,a_http,a_tcp,thread_seq); } } if(!frag->media_type) { scan_data = (char*)session_info->buf; scan_datalen = session_info->buflen; } break; } /*scan*/ if(NULL!=scan_data) { /* 表达式扫描*/ region_name = http_proto_flag2region(session_info->prot_flag); region_name_len = strlen(region_name); ret=Maat_set_scan_status(g_frag_prog_para.feather, &frag->mid, MAAT_SET_SCAN_DISTRICT,region_name,strlen(region_name)); if(ret<0) { return PROT_STATE_DROPME; } hit_result = Maat_full_scan_string( g_frag_prog_para.feather, g_frag_prog_para.expr_tableid, CHARSET_GBK, scan_data, scan_datalen, maat_result, found_pos, MAAT_RESULT_NUM, &(frag->mid),thread_seq); /*IP地址扫描*/ if(g_frag_prog_para.ip_switch && hit_result<=0 && frag->s2c_first_pkt) { /*S2C首包无法命中,扫描IP地址*/ hit_result = Maat_scan_proto_addr(g_frag_prog_para.feather,g_frag_prog_para.ip_tableid, (struct ipaddr*)&a_tcp->addr, TCL_PROTOCOL, maat_result,MAAT_RESULT_NUM,&(frag->mid),thread_seq); } /* 多线程情况下HASH查找*/ if(hit_result<=0 && frag->s2c_first_pkt) { hit_result = frag_multithread_hash_search(scan_data,scan_datalen,frag,a_http,a_tcp,thread_seq); /*不再扫描,DROP会话*/ if(hit_result<=0) { //if(scan_data!=NULL && *scan_data==0x47) //{ // hit_result = set_hls_hit(frag, a_http, a_tcp, thread_seq); //} //else //{ return PROT_STATE_DROPME; //} } } } if(hit_result>0) { return_value = set_frgmnt_hit(maat_result,hit_result,frag,a_http,a_tcp,thread_seq); } return return_value; } char frag_save(stSessionInfo* session_info, frag_info_t* frag, http_infor *a_http, struct streaminfo *a_tcp, int thread_seq) { int opt_type = -1; switch(session_info->prot_flag) { case HTTP_MESSAGE_URL: opt_type = FRAG_URL; break; case HTTP_HOST: opt_type = FRAG_HOST; break; case HTTP_USER_AGENT: opt_type = FRAG_UA; break; case HTTP_REFERER: opt_type = FRAG_REFERER; break; case HTTP_CONT_TYPE: if(a_http->curdir==DIR_S2C) { opt_type = FRAG_S2C_CONTENT_TYPE; } break; default: break; } if(-1!=opt_type && frag->frag_opt[opt_type].opt_value==NULL) { frag->frag_opt[opt_type].opt_value = (char*)dictator_malloc(thread_seq, session_info->buflen+1); memcpy(frag->frag_opt[opt_type].opt_value, session_info->buf, session_info->buflen); frag->frag_opt[opt_type].opt_value[session_info->buflen] = '\0'; frag->frag_opt[opt_type].opt_len = session_info->buflen+1; } return 0; } uchar frag_sendback(stSessionInfo* session_info, frag_info_t* frag, http_infor *a_http, struct streaminfo *a_tcp, int thread_seq) { uchar return_stat = PROT_STATE_GIVEME; /*不是索引信息,不确定目的地,通过a_tcp->dir判断是否是单向流再决定目的地*/ if(frag->session_type==SESSION_FRGMNT_UNKNOW) { if(a_tcp->dir == DIR_DOUBLE) { frag->session_type = SESSION_FRGMNT_DATA; } else { frag->session_type = SESSION_FRGMNT_INDEX; } } /*change because of a_tcp->dir is not accurate*/ if(frag->session_type == SESSION_FRGMNT_INDEX) { if(a_tcp->dir==DIR_DOUBLE && frag->data_dir!=0) { frag->session_type = SESSION_FRGMNT_DATA; } } /*index need cache;*/ if(frag->session_type == SESSION_FRGMNT_INDEX) { if(DIR_S2C==a_http->curdir && (session_info->prot_flag==HTTP_CONTENT || session_info->prot_flag==HTTP_UNGZIP_CONTENT)) { if(frag->frag_opt[FRAG_S2C_CONTENT].opt_lenfrag_opt[FRAG_S2C_CONTENT].opt_value = (char*)dictator_realloc(thread_seq, frag->frag_opt[FRAG_S2C_CONTENT].opt_value, frag->frag_opt[FRAG_S2C_CONTENT].opt_len+session_info->buflen); memcpy(frag->frag_opt[FRAG_S2C_CONTENT].opt_value+frag->frag_opt[FRAG_S2C_CONTENT].opt_len, session_info->buf, session_info->buflen); frag->frag_opt[FRAG_S2C_CONTENT].opt_len += session_info->buflen; } else { frag->proc_stat = FRAG_PROC_STAT_SCAN; return PROT_STATE_DROPME; } } } /*send 确定发往CPZ*/ if(frag->session_type==SESSION_FRGMNT_DATA) { if(a_http->res_code==200 || a_http->res_code==206 || a_http->res_code==0) { if(session_info->prot_flag==HTTP_CONTENT || session_info->prot_flag==HTTP_UNGZIP_CONTENT) { return_stat = frgmnt_av_sendback(frag,(char*)session_info->buf,session_info->buflen,a_http,a_tcp,thread_seq); } } else { frag->proc_stat = FRAG_PROC_STAT_SCAN; return_stat = PROT_STATE_DROPME; } } return return_stat; } uchar FRAG_MONITOR_ENTRY(stSessionInfo* session_info, void **param,int thread_seq, struct streaminfo *a_tcp, void *a_packet) { if(NULL==session_info) { return PROT_STATE_DROPME; } uchar return_stat = PROT_STATE_GIVEME; http_infor* a_http = (http_infor *)(session_info->app_info); frag_info_t* frag = (frag_info_t*)*param; /*other HTTP, do not proc*/ if(a_http->method!=HTTP_METHOD_UNKNOWN&&a_http->method!=HTTP_METHOD_GET) { return PROT_STATE_DROPME; } /*init*/ if(frag==NULL) { if(init_frag(&frag, thread_seq) <0) { return PROT_STATE_DROPME; } *param = frag; FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[HTTP_SESSION], 0, FS_OP_ADD, 1); } if(session_info->prot_flag!=HTTP_STATE && frag->proc_stat==FRAG_PROC_STAT_SCAN) { return_stat = frag_scan(session_info,frag,a_http,a_tcp,thread_seq); if(PROT_STATE_DROPME==return_stat) goto DROP_SESSION; if(frag->proc_stat==FRAG_PROC_STAT_HIT) { frag->proc_stat = FRAG_PROC_STAT_SEND; } } /*碎片元信息存储*/ frag_save(session_info,frag,a_http,a_tcp,thread_seq); /*等待refer,发FD日志*/ if(frag->proc_stat==FRAG_PROC_STAT_SEND) { /*碎片化管控*/ if(NULL!=a_http->p_url && DIR_C2S==a_http->curdir && session_info->prot_flag==HTTP_STATE && HTTP_DATA_BEGIN==a_http->http_state) { return_stat = frag_check_block(frag, a_http, a_tcp, a_packet, thread_seq); if(PROT_STATE_DROPME==return_stat) goto DROP_SESSION; } } if(session_info->prot_flag!=HTTP_STATE && frag->proc_stat==FRAG_PROC_STAT_SEND) { /*数据回传*/ return_stat = frag_sendback(session_info,frag,a_http,a_tcp,thread_seq); if(PROT_STATE_DROPME==return_stat) goto DROP_SESSION; } DROP_SESSION: /*release*/ if(return_stat==PROT_STATE_DROPME||session_info->session_state&SESSION_STATE_CLOSE) { /*确认frag*/ project_req_add_char(a_tcp, g_frag_prog_para.frag_project_id, 0); if(frag->proc_stat==FRAG_PROC_STAT_SEND) { if(frag->session_type==SESSION_FRGMNT_INDEX) { frgmnt_index_sendback(frag,a_http,a_tcp,thread_seq); frag_write_to_log(FRAG_INDEX_LOG, frag, a_http, a_tcp, thread_seq); } /*传输运行状态发送至kafka*/ if(g_frag_prog_para.frag_report_switch) { frag_json_report(LOCAL_LOG_TYPE_FRAG_FIND_PROG,frag, a_http, a_tcp, thread_seq); FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[FRAG_PROG_TOPIC], 0, FS_OP_ADD, 1); } /*碎片预告发给kafka,暂时只发碎片的*/ if(g_frag_prog_para.frag_forecast_switch && 0!=frag->data_dir && a_http->p_url!=NULL && frag->hit_server_ip==0) { frag_json_report(LOCAL_LOG_TYPE_FRAG_SERVER_IP,frag, a_http, a_tcp, thread_seq); FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[FRAG_URL_TOPIC], 0, FS_OP_ADD, 1); } } release_frag((frag_info_t*)*param, thread_seq); FS_operate(g_frag_prog_para.fs_handle, g_frag_prog_para.fs_id[HTTP_SESSION_CLOSE], 0, FS_OP_ADD, 1); *param = NULL; } return return_stat; } int read_main_conf() { char log_filename[MAX_PATH_LEN]={0}; char table_info_filename [MAX_PATH_LEN]={0}; char config_buff[MAX_PATH_LEN]={0}; int log_level = 0; g_frag_prog_para.avconf_handle = wired_cfg_create("AV_CONF", "./avconf/av.conf"); wired_cfg_init(g_frag_prog_para.avconf_handle); g_frag_prog_para.avcomconf_handle = wired_cfg_create("AV_COM", "./avconf/av_com.conf"); wired_cfg_init(g_frag_prog_para.avcomconf_handle); //LOG wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","LogSwitch",config_buff,sizeof(config_buff),"0"); g_frag_prog_para.log_switch = (short)atoi(config_buff); wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","LogLevel",config_buff,sizeof(config_buff),"30"); log_level = (short)atoi(config_buff); wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","LogPath",g_frag_prog_para.log_path,sizeof(config_buff),"./avlog/frag_monitor/"); snprintf(log_filename, sizeof(log_filename), "%s/%s", g_frag_prog_para.log_path, "runtime.log"); g_frag_prog_para.logger = MESA_create_runtime_log_handle(log_filename,log_level); if(NULL==g_frag_prog_para.logger) { printf("MESA_create_runtime_log_handle error.\n"); return -1; } /*maat*/ #if K_PROJECT g_frag_prog_para.frag_url_table_id = Maat_table_register(g_AV_global_feather, "MM_GLOBAL_FRAG_INDEX_URL"); #else g_frag_prog_para.frag_url_table_id = Maat_table_register(g_AV_global_feather, "GLOBAL_FRAG_INDEX_URL"); #endif int value = 0; g_frag_prog_para.fs_handle = FS_create_handle(); wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","StatFile",config_buff,sizeof(config_buff),"./avlog/frag_monitor/frag_monitor_status.log"); FS_set_para(g_frag_prog_para.fs_handle, OUTPUT_DEVICE, config_buff, strlen(config_buff)+1); value = 1;//flush by date FS_set_para(g_frag_prog_para.fs_handle, FLUSH_BY_DATE, &value, sizeof(value)); value = 2;//append FS_set_para(g_frag_prog_para.fs_handle, PRINT_MODE, &value, sizeof(value)); wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","StatInterval",config_buff,sizeof(config_buff),"30"); value = atoi(config_buff); FS_set_para(g_frag_prog_para.fs_handle, STAT_CYCLE, &value, sizeof(value)); wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","StatSwitch",config_buff,sizeof(config_buff),"1"); value = atoi(config_buff); FS_set_para(g_frag_prog_para.fs_handle, PRINT_TRIGGER, &value, sizeof(value)); if(value) { /*与总控监控一致*/ wired_cfg_read(g_frag_prog_para.avcomconf_handle,"COMMON","fs2_ip",config_buff,sizeof(config_buff),""); FS_set_para(g_frag_prog_para.fs_handle, STATS_SERVER_IP, config_buff, strlen(config_buff)+1); wired_cfg_read(g_frag_prog_para.avcomconf_handle,"COMMON","fs2_port",config_buff,sizeof(config_buff),"8125"); unsigned short port = atoi(config_buff); FS_set_para(g_frag_prog_para.fs_handle, STATS_SERVER_PORT, &port, sizeof(port)); } wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","FRAG_NAME",config_buff,sizeof(config_buff),"frag_monitor"); FS_set_para(g_frag_prog_para.fs_handle, APP_NAME, config_buff, strlen(config_buff)+1); value = 1; FS_set_para(g_frag_prog_para.fs_handle, CREATE_THREAD, &value, sizeof(value)); g_frag_prog_para.fs_id[HTTP_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "HTTP_links"); g_frag_prog_para.fs_id[HLS_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "HLS_links"); g_frag_prog_para.fs_id[OSMF_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "OSMF_links"); g_frag_prog_para.fs_id[IOS_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "IPA_links"); g_frag_prog_para.fs_id[ANDRIOD_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "APK_links"); g_frag_prog_para.fs_id[APP_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "APP_links"); g_frag_prog_para.fs_id[HIT_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "HIT_links"); g_frag_prog_para.fs_id[URLDROP_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "URLDROP_links"); g_frag_prog_para.fs_id[DROP_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "DROP_links"); g_frag_prog_para.fs_id[FRAG_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "FRAG_links"); g_frag_prog_para.fs_id[MAYBE_FRAG_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "MATBE_links"); g_frag_prog_para.fs_id[WEIRD_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "WEIRD_links"); g_frag_prog_para.fs_id[HLS_INDEX_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "HLS_index"); g_frag_prog_para.fs_id[HLS_DATA_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "HLS_data"); g_frag_prog_para.fs_id[OSMF_DATA_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "OSMF_data"); g_frag_prog_para.fs_id[IOS_DATA_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "IPA_data"); g_frag_prog_para.fs_id[ANDRIOD_DATA_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "APK_data"); g_frag_prog_para.fs_id[APP_DATA_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "APP_data"); g_frag_prog_para.fs_id[FRAG_DATA_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "FRAG_data"); g_frag_prog_para.fs_id[MAYBE_FRAG_DATA_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "MATBE_data"); g_frag_prog_para.fs_id[WEIRD_DATA_SESSION] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "WEIRD_data"); g_frag_prog_para.fs_id[HLS_DATA_PKTS] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "HLS_d_pkts"); g_frag_prog_para.fs_id[HLS_DATA_BYTES] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "HLS_d_bytes"); g_frag_prog_para.fs_id[OSMF_DATA_PKTS] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "OSMF_d_pkts"); g_frag_prog_para.fs_id[OSMF_DATA_BYTES] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "OSMF_d_bytes"); g_frag_prog_para.fs_id[IOS_DATA_PKTS] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "IPA_d_pkts"); g_frag_prog_para.fs_id[IOS_DATA_BYTES] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "IPA_d_bytes"); g_frag_prog_para.fs_id[ANDRIOD_DATA_PKTS] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "APK_d_pkts"); g_frag_prog_para.fs_id[ANDRIOD_DATA_BYTES] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "APK_d_bytes"); g_frag_prog_para.fs_id[APP_DATA_PKTS] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "APP_d_pkts"); g_frag_prog_para.fs_id[APP_DATA_BYTES] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "APP_d_bytes"); g_frag_prog_para.fs_id[FRAG_DATA_PKTS] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "FRAG_d_pkts"); g_frag_prog_para.fs_id[FRAG_DATA_BYTES] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "FRAG_d_bytes"); g_frag_prog_para.fs_id[MAYBE_FRAG_DATA_PKTS] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "MAYBE_d_pkts"); g_frag_prog_para.fs_id[MAYBE_FRAG_DATA_BYTES] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "MAYBE_d_bytes"); g_frag_prog_para.fs_id[WEIRD_DATA_PKTS] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "WEIRD_d_pkts"); g_frag_prog_para.fs_id[WEIRD_DATA_BYTES] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "WEIRD_d_bytes"); g_frag_prog_para.fs_id[INDEX_TOPIC] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "index_topic"); g_frag_prog_para.fs_id[CNVG_TOPIC] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "cnvg_topic"); g_frag_prog_para.fs_id[FRAG_PROG_TOPIC] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "prog_topic"); g_frag_prog_para.fs_id[FRAG_URL_TOPIC] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "url_topic"); g_frag_prog_para.fs_id[URL_META] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "url_meta"); g_frag_prog_para.fs_id[KEY_META] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "key_meta"); g_frag_prog_para.fs_id[HTTP_SESSION_CLOSE] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "HTTP_close"); g_frag_prog_para.fs_id[INDEX_GZIP] = FS_register(g_frag_prog_para.fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, "index_gzip"); FS_start(g_frag_prog_para.fs_handle); /*media_hash*/ uint32_t media_hash_size = 0; uint32_t media_hash_max_elem_num = 0; uint32_t media_hash_expire_time = 0; wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","MediaHashSize",config_buff,sizeof(config_buff),"65536"); media_hash_size = atoi(config_buff); wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","MediaHashElemNum",config_buff,sizeof(config_buff),"1048576"); media_hash_max_elem_num = atoi(config_buff); wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","MediaHashExpireTime",config_buff,sizeof(config_buff),"120"); media_hash_expire_time = atoi(config_buff); MESA_htable_create_args_t hash_args; memset(&hash_args,0,sizeof(MESA_htable_create_args_t)); hash_args.thread_safe = 1; //group lock hash_args.recursive = 1; hash_args.hash_slot_size = media_hash_size; hash_args.max_elem_num = media_hash_max_elem_num; hash_args.eliminate_type = HASH_ELIMINATE_ALGO_LRU; hash_args.expire_time = media_hash_expire_time; hash_args.key_comp = NULL; hash_args.key2index = NULL; hash_args.data_free = free_media; hash_args.data_expire_with_condition = expire_media_hash_node; g_frag_prog_para.media_hash = MESA_htable_create(&hash_args, sizeof(hash_args)); if(NULL==g_frag_prog_para.media_hash) { printf("create media_hash error.\n"); return -1; } //SYSTEM /*app switch*/ wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","AppSwitch",config_buff,sizeof(config_buff),"0"); g_frag_prog_para.app_switch = (short)atoi(config_buff); wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","AppGzipFeatureSwitch",config_buff,sizeof(config_buff),"0"); g_frag_prog_para.app_gzip_switch = (short)atoi(config_buff); wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","OsmfDataFeatureSwitch",config_buff,sizeof(config_buff),"0"); g_frag_prog_para.osmf_data_feature_switch = (short)atoi(config_buff); wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","IPSwitch",config_buff,sizeof(config_buff),"0"); g_frag_prog_para.ip_switch = (short)atoi(config_buff); wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","S2cCacheSize",config_buff,sizeof(config_buff),"500000"); g_frag_prog_para.s2c_cache_size = (uint32)atoi(config_buff); wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","FragReportSwitch",config_buff,sizeof(config_buff),"0"); g_frag_prog_para.frag_report_switch = (uint32)atoi(config_buff); wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","FragForecastSwitch",config_buff,sizeof(config_buff),"0"); g_frag_prog_para.frag_forecast_switch = (uint32)atoi(config_buff); /*maat*/ wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","MaatTableInfo",table_info_filename,sizeof(table_info_filename),"./avconf/frag_monitor/table_info.conf"); wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","FragMonitorJSON",config_buff,sizeof(config_buff),"./avconf/frag_monitor/frag_monitor.json"); int scan_detail = 0; g_frag_prog_para.feather = Maat_feather(g_iThreadNum, table_info_filename, g_frag_prog_para.logger); Maat_set_feather_opt(g_frag_prog_para.feather, MAAT_OPT_JSON_FILE_PATH, config_buff, strlen(config_buff)+1); wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","MaatStatFile",config_buff,sizeof(config_buff),"./avlog/frag_monitor/maat_stat.log"); Maat_set_feather_opt(g_frag_prog_para.feather, MAAT_OPT_STAT_FILE_PATH, config_buff, strlen(config_buff)+1); int temp = 0; wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","MaatStatSwitch",config_buff,sizeof(config_buff),"0"); temp = atoi(config_buff); if(temp) { Maat_set_feather_opt(g_frag_prog_para.feather, MAAT_OPT_STAT_ON, NULL, 0); } wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","MaatPerfSwitch",config_buff,sizeof(config_buff),"0"); temp = atoi(config_buff); if(temp) { Maat_set_feather_opt(g_frag_prog_para.feather, MAAT_OPT_PERF_ON, NULL, 0); } Maat_set_feather_opt(g_frag_prog_para.feather, MAAT_OPT_SCAN_DETAIL, &scan_detail, sizeof(scan_detail)); Maat_initiate_feather(g_frag_prog_para.feather); if(NULL==g_frag_prog_para.feather) { printf("Maat_summon_feather_json error.\n"); return -1; } g_frag_prog_para.expr_tableid = Maat_table_register(g_frag_prog_para.feather,"FRAG_MONITOR_KEYWORDS"); g_frag_prog_para.ip_tableid = Maat_table_register(g_frag_prog_para.feather,"FRAG_MONITOR_IP"); //碎片预告 g_frag_prog_para.frag_serverip_tableid = Maat_table_register(g_AV_global_feather,"FRAG_SERVER_IP"); //NETWORK //get locat IP wired_cfg_read(g_frag_prog_para.avconf_handle,"Module","SendBack_Device",config_buff,sizeof(config_buff),"mg0"); g_frag_prog_para.local_ip_nr = get_ip_by_ifname(config_buff); if(g_frag_prog_para.local_ip_nr==INADDR_NONE) { printf("get [NETWORK]LocalIP error.\n"); return -1; } #if K_PROJECT #else // index Kafka初始化// wired_cfg_read(g_frag_prog_para.avcomconf_handle,"COMMON","FD_TYPE2_KAFKA_BROKERS",g_frag_prog_para.index_brokers, sizeof(g_frag_prog_para.index_brokers),"0"); g_frag_prog_para.index_kafka_producer = new KafkaProducer(g_frag_prog_para.index_brokers); if(NULL==g_frag_prog_para.index_kafka_producer) { printf("KafkaProducer error.\n"); return -1; } if(0!=g_frag_prog_para.index_kafka_producer->KafkaConnection()) { printf("KafkaConnection %s error.\n", g_frag_prog_para.index_brokers); MESA_handle_runtime_log(g_frag_prog_para.logger, RLOG_LV_FATAL, FRGMNT_PLUGIN_NAME, "{%s:%d} KafkaConnection %s error.", __FILE__,__LINE__, g_frag_prog_para.index_brokers); } else { printf("KafkaConnection %s succ.\n", g_frag_prog_para.index_brokers); MESA_handle_runtime_log(g_frag_prog_para.logger, RLOG_LV_FATAL, FRGMNT_PLUGIN_NAME, "{%s:%d} KafkaConnection %s succ.", __FILE__,__LINE__, g_frag_prog_para.index_brokers); } #endif //TRACE wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","TraceConfFile",config_buff, sizeof(config_buff),"./avconf/frag_monitor/addr_trace.conf"); g_frag_prog_para.addr_trace = MESA_trace_create_addr_handle(g_frag_prog_para.logger, config_buff); wired_cfg_read(g_frag_prog_para.avconf_handle,"FRAG_MONITOR","TraceLogFile",g_frag_prog_para.trace_dir, sizeof(g_frag_prog_para.trace_dir),"./avlog/frag_monitor/"); return 0; } extern "C" int FRAG_MONITOR_INIT() { if(-1==read_main_conf()) { return -1; } g_iThreadNum = get_thread_count(); /*kafka*/ #if K_PROJECT if(NULL!=g_multi_kafka_producer) { rd_kafka_topic_conf_t* frag_topic_conf = rd_kafka_topic_conf_new(); g_frag_prog_para.frag_topic_rkt[MEDIA_INDEX_TOPIC_RKT] = rd_kafka_topic_new(g_multi_kafka_producer, TOPIC_MEDIA_INDEX_DATA, frag_topic_conf); frag_topic_conf = rd_kafka_topic_conf_new(); g_frag_prog_para.frag_topic_rkt[MEDIA_CNVG_TOPIC_RKT] = rd_kafka_topic_new(g_multi_kafka_producer, TOPIC_MEDIA_CONVERGE_DATA, frag_topic_conf); } #else if((g_frag_prog_para.index_kafka_producer->CreateTopicHandle(TOPIC_MEDIA_INDEX_DATA)) == NULL) { printf("Kafka CreateTopicHandle %s failed.", TOPIC_MEDIA_INDEX_DATA); return -1; } if((g_frag_prog_para.index_kafka_producer->CreateTopicHandle(TOPIC_MEDIA_CONVERGE_DATA)) == NULL) { printf("Kafka CreateTopicHandle %s failed.", TOPIC_MEDIA_CONVERGE_DATA); return -1; } #endif g_frag_prog_para.frag_project_id = project_producer_register("FRAG", PROJECT_VAL_TYPE_CHAR, NULL); return 0; } extern "C" void FRAG_MONITOR_DESTROY(void) { }