/* author: lishu start date: 2015-9-23 function: 1. frag_reassembly use for netdisk and HLS/OSMF av. 2. the difference between netdisk and AV netdisk maybe boundary when upload , need extract, but AV not proc FRAG_CONTENT */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "MESA_handle_logger.h" #include "MESA_prof_load.h" #include "MESA_htable.h" #include "MESA_list_queue.h" #include "MESA_trace.h" #include "stream.h" #include "cJSON.h" #include "interval_index.h" #include "stream_fuzzy_hash.h" #include "soqav_dedup.h" #include "app_detect.h" #include "bizman.h" #include "AV_sendback_in.h" #include "AV_sendback.h" #include "common.h" #include "frag_reassembly_in.h" #include "frag_reassembly.h" #include "frag_proc.h" #include "frag_dedup.h" #include "frag_redis.h" #include "frag_av.h" #include "frag_app.h" #include "frag_voip.h" #include "frag_json.h" #include "frag_dedup.h" #include "av_record.h" #include "sifter.h" #include "service.h" #include "log.h" #include "field_stat2.h" extern frag_rssb_parameter_t g_frag_run; extern frag_rssb_configure_t g_frag_cfg; extern frag_rssb_status_t g_frag_stat; frag_reassembly_t frag_rssb; extern const char* hash_eliminate_type[3]; int expire_media_hash_node(void *data, int eliminate_type) { media_t* mdi = (media_t*)data; switch(eliminate_type) { case ELIMINATE_TYPE_NUM: atomic_inc(&g_frag_stat.sysinfo_stat[MEDIA_HASH][HASH_NUM_EXPIRE]); break; case ELIMINATE_TYPE_TIME: atomic_inc(&g_frag_stat.sysinfo_stat[MEDIA_HASH][HASH_TIME_EXPIRE]); break; default: break; } MESA_handle_runtime_log(g_frag_run.logger, RLOG_LV_INFO, FRAG_REASSEMBLY_MODULE_NAME, "{%s:%d} expire_media_hash_node %s: [MID: %llu]", __FILE__,__LINE__, hash_eliminate_type[eliminate_type], mdi->mid); return 1; } void free_text(text_t** pp, int n_p) { text_t* p = *pp; if(NULL!=p) { for(int i=0;itext) { free((p+i)->text); (p+i)->text = NULL; } } } free(p); p = NULL; } *pp = NULL; } void free_opt(opt_in_t** data) { opt_in_t* opt = (opt_in_t*)(*data); if(NULL!=opt) { if(NULL!=opt->opt_value) { free(opt->opt_value); } free(opt); } *data = NULL; } int free_frag_in(void *data, long data_len, void *arg) { frag_in_t* frg_in = (frag_in_t*)data; if(NULL!=frg_in) { if(frg_in->data!=NULL) { free(frg_in->data); } free(frg_in); } return 0; } int64_t renew_time_accord_medialen(uint64_t media_len) { int64_t renew_time = g_frag_cfg.renew_time_min + media_len / 1024 / 1024 * g_frag_cfg.renew_time_step; /* prog_len(MB)*step */ if(renew_time > g_frag_cfg.renew_time_max) { renew_time = g_frag_cfg.renew_time_max; } return(renew_time); } void renew_media(media_t* mdi) { if(NULL!=mdi) { expire_media_write_to_log(mdi, MEDIA_RENEW_EXPIRE, NULL); create_media_write_to_log(mdi, MEDIA_RENEW, NULL); atomic_inc(&g_frag_stat.media_stat[LOG_MEDIA_RENEW]); FLAG_CLEAR(mdi->flag, PROG_SEND_META); mdi->create_time = time(NULL); mdi->renew_time = mdi->create_time + renew_time_accord_medialen(mdi->media_len); mdi->pkt_proc = 0; mdi->pkt_in = 0; mdi->byte_proc = 0; mdi->byte_in = 0; for(int i=0;irepeat_reoffset[i] = 0; } IVI_destroy(mdi->ivi, NULL, NULL); mdi->ivi = IVI_create(); } } void save_media(media_t* mdi) { int ivi_seg_cnt = IVI_seg_cnt(mdi->save_ivi); int tmp_cnt = ivi_seg_cnt; IVI_seg_t* ivi_first_seg = IVI_first_seg(mdi->save_ivi); IVI_seg_t* ivi_seg = ivi_first_seg; IVI_seg_t* next_seg; FILE* pFile; char filename[MAX_PATH_LEN] = {0}; char data_path[MAX_PATH_LEN] = {0}; struct tm now; char day_time[32] = {0}; char* suffix = NULL; struct timeval tv; struct timezone tz; gettimeofday(&tv, &tz); localtime_r(&tv.tv_sec, &now); strftime(day_time, sizeof(day_time), "%Y%m%d", &now); if((g_frag_cfg.cpz_type == CPZ_VOIP)&&((mdi->media_type == AUDIO_VIVOX) ||( mdi->media_type==FILE_UNKNOWN))) { return; } if(NULL==ivi_seg) return; suffix = gen_filesuffix_by_mediatype(suffix, mdi->media_type, mdi->proto); mkdir_r(g_frag_cfg.save_media_path); snprintf(data_path, sizeof(data_path), "%s/%s/", g_frag_cfg.save_media_path, day_time); mkdir_r(data_path); int reoffset = 0; snprintf(filename, sizeof(filename), "%s/%s/%lu_%hu.%s", g_frag_cfg.save_media_path, day_time, mdi->mid, reoffset, suffix); pFile = fopen(filename,"r+"); if(NULL==pFile) { pFile = fopen(filename,"w"); fclose(pFile); pFile = fopen(filename,"r+"); } if(NULL!=pFile) { fwrite(ivi_seg->data, ivi_seg->right - ivi_seg->left+1, 1, pFile); } while( -- tmp_cnt) { next_seg = IVI_next_seg(ivi_seg); if(ivi_seg->right == next_seg->left - 1) { fwrite(next_seg->data, next_seg->right - next_seg->left+1, 1, pFile); } else { fflush(pFile); fclose(pFile); reoffset ++; snprintf(filename, sizeof(filename), "%s/%s/%lu_%hu.%s", g_frag_cfg.save_media_path, day_time, mdi->mid, reoffset, suffix); pFile = fopen(filename,"r+"); if(NULL==pFile) { pFile = fopen(filename,"w"); fclose(pFile); pFile = fopen(filename,"r+"); } if(NULL!=pFile) { fwrite(next_seg->data, next_seg->right - next_seg->left+1, 1, pFile); } } ivi_seg = next_seg; } fflush(pFile); fclose(pFile); //free ivi while(ivi_seg_cnt--) { ivi_first_seg = IVI_first_seg(mdi->save_ivi); free(ivi_first_seg->data); ivi_first_seg->data = NULL; IVI_remove(mdi->save_ivi,ivi_first_seg); IVI_seg_free(ivi_first_seg,NULL,NULL); } IVI_destroy(mdi->save_ivi,NULL,NULL); } void free_media(void* data) { media_t* mdi = (media_t*)data; double complte_rate = 0; char filename[MAX_PATH_LEN] = {0}; if(NULL!=mdi) { expire_media_write_to_log(mdi, MEDIA_EXPIRE, NULL); atomic_inc(&g_frag_stat.media_stat[LOG_MEDIA_EXPIRE]); if(g_frag_cfg.app_switch) { if(mdi->app_frg_lq) { proc_app_data(mdi); if(NULL!=mdi->app_frg_lq) { MESA_lqueue_destroy(mdi->app_frg_lq, NULL, NULL); mdi->app_frg_lq = NULL; } } if(NULL!=mdi->addrlist) { free(mdi->addrlist); } } if(g_frag_cfg.save_media) { save_media(mdi); } /*survey_json_report*/ if(g_frag_cfg.media_json_switch) { media_json_report(mdi, TOPIC_EVENT_EXPIRE); } if(mdi->media_service_type==MEDIA_SERVICE_TYPE_SIP) { send_sip_log_when_expire(mdi); free_media_sip(mdi); } /*delete monitor file whose size is lower than ...*/ if(g_frag_cfg.all_hit_monitor_switch==ALL_HIT_MONITOR_FULLFILE && MEDIA_SERVICE_TYPE_AV==mdi->media_service_type && g_frag_cfg.all_hit_monitor_complete_rate>0) { complte_rate = (double)mdi->byte_proc/(double)mdi->media_len*100; if(complte_rate < g_frag_cfg.all_hit_monitor_complete_rate) { gen_monitor_file_path(mdi->create_time, mdi->mid, (char*)"flv", filename, MAX_PATH_LEN); remove(filename); } } if(NULL!=mdi->fuzzy) { av_digest_record(mdi); SFH_release(mdi->fuzzy); mdi->fuzzy = NULL; } for(int i=0;iopt_index;j++) { if(NULL!=mdi->opt[i][j]) { free_opt(&mdi->opt[i][j]); } } } if(NULL!=mdi->td_data) { free(mdi->td_data); mdi->td_data = NULL; } if(NULL!=mdi->pid) { free(mdi->pid); } if(NULL!=mdi->monitor_path) { free(mdi->monitor_path); } /*����������*/ struct queue_item* first_item = TAILQ_FIRST(&mdi->query_wait_lq); frag_in_t* frg = NULL; while(first_item != NULL) { struct queue_item *item = TAILQ_NEXT(first_item, entries); TAILQ_REMOVE(&mdi->query_wait_lq, first_item, entries); frg = (frag_in_t*)first_item->node; atomic_inc(&g_frag_stat.sysinfo_stat[MULTISRC_QUEUE][QUEUE_OUT]); frag_write_to_log(ADD_FRAG_FROM_TAILQ, frg->mid, frg, NULL, 0); free_frag_in(frg,0,NULL); free(first_item); first_item = NULL; first_item = item; } for(int i=0;imedia_info[i]) { free_text((&mdi->media_info[i]), 1); } } IVI_destroy(mdi->ivi, NULL, NULL); free(mdi); } } void set_sip_query_task(frag_unit_t* frg_unit, media_t* mdi) { /*voip SIP����*/ frag_write_to_log(SEND_VOIP_QUERY_1, frg_unit->pid, frg_unit, NULL, 0); atomic_inc(&frag_rssb.stat_info[RSSB_INDEX_ONCE_QUERY_SEND]); if(sip_index_query(frg_unit, frg_unit->thread_seq)) { frag_write_to_log(RECV_VOIP_ACK_1, frg_unit->pid, frg_unit, NULL, 0); atomic_inc(&frag_rssb.stat_info[RSSB_INDEX_ONCE_QUERY_RECV]); } else { frag_write_to_log(VOIP_QUERY_FAIL_1, frg_unit->pid, frg_unit, NULL, 0); } proc_sip_opt(frg_unit, mdi); if(mdi->proto==AV_PROTOCOL_SIP) { if(mdi->re_offset==0) { /*���ö�ʱ����:������URL��ʱ����Ҫ���еڶ��ε���������*/ struct timer_context_t *context = (struct timer_context_t *)calloc(1, sizeof(struct timer_context_t)); context->mid = mdi->mid; MESA_timer_add(g_frag_run.index_query_timer[mdi->thread_seq], time(NULL), g_frag_cfg.index_query_timeout, index_query_timeout, context, index_query_timeout_free, &mdi->index_query_timer_idx); g_frag_run.index_query_timer_on[mdi->thread_seq] = 1; } } } void proc_media_opt(frag_unit_t* frg_unit, media_t* mdi) { /*addr for app*/ if(g_frag_cfg.app_switch && NULL!=frg_unit->opt[MEDIA_OPT_ADDR]) { mdi->addrlist = merge_addr(mdi->addrlist, &mdi->addrlist_len, frg_unit->opt[MEDIA_OPT_ADDR]->opt_value, frg_unit->opt[MEDIA_OPT_ADDR]->opt_len, frg_unit->thread_seq); } } void save_media_opt(frag_unit_t* frg_unit, media_t* mdi) { /*���ÿһ��ѡ��*/ for(int i =0;iopt_indexopt[i]) { mdi->opt[i][mdi->opt_index] = frg_unit->opt[i]; frg_unit->opt[i] = NULL; } if(NULL==mdi->opt[MEDIA_OPT_URL][mdi->url_opt_index] && NULL!=mdi->opt[MEDIA_OPT_URL][i]) { mdi->url_opt_index = i; } } if(mdi->opt_indexopt_index++; } } /*ʶ���Ŀ������ ��ͨ����Ƶ ��Ƭ��Ŀ VOIP*/ void set_media_service_type(media_t* mdi) { if(FILE_OSMF==mdi->media_type||FILE_HLS==mdi->media_type) { mdi->media_service_type = MEDIA_SERVICE_TYPE_FRAG; mdi->media_len = 0; } else if(FILE_REQ_FRAG==mdi->media_type) { mdi->media_service_type = MEDIA_SERVICE_TYPE_FRAG; } else if(AV_PROTOCOL_SIP==mdi->proto) { mdi->media_service_type = MEDIA_SERVICE_TYPE_SIP; } } /*proc_unit : frag_unit*/ long media_create_cb(void *data, const uint8_t *key, uint size, void *user_arg) { media_t* mdi = (media_t*)data; frag_unit_t* frg_unit = (frag_unit_t*)user_arg; int in_set = 0; // record cap_ip/PID , update, keep the newest capIP char pbuf[32] = {0}; int buf_len = 32; int media_create_flag = 0; if(NULL==mdi) { mdi = (media_t*)calloc(1,sizeof(media_t)); if(0>(MESA_htable_add(g_frag_run.media_hash,key, size,(const void*)mdi))) { MESA_handle_runtime_log(g_frag_run.logger, RLOG_LV_FATAL, FRAG_REASSEMBLY_MODULE_NAME, "{%s:%d} create_media_cb MESA_htable_add error: [mid: %llu]", __FILE__,__LINE__, *(uint64_t*)key); free_media(mdi); return -1; } media_create_flag = 1; /*set mediainfo*/ mdi->mid = frg_unit->mid; mdi->mid_after_multisrc = mdi->mid; mdi->media_len = frg_unit->media_len; /*media type*/ mdi->media_type = frg_unit->media_type; mdi->proto = frg_unit->proto; mdi->hit_service = frg_unit->hitservice; mdi->data_flag = frg_unit->data_flag; mdi->meta_flag = frg_unit->flag; mdi->thread_seq = frg_unit->thread_seq; /*ȷ����Ŀ�Ƿ�����Ƭ����VOIP*/ set_media_service_type(mdi); /*����ѡ��*/ proc_media_opt(frg_unit, mdi); /*����sip��ѯ����*/ if(mdi->media_service_type==MEDIA_SERVICE_TYPE_SIP) { set_sip_query_task(frg_unit, mdi); } /*����ý�����ͼ��*/ if(0!=mdi->media_type) { set_special_media_service(mdi->media_type, mdi); } /*app*/ if(g_frag_cfg.app_switch) { mdi->app_frg_lq = MESA_lqueue_create(1, 0); } mdi->create_time = time(NULL); mdi->renew_time = mdi->create_time + renew_time_accord_medialen(mdi->media_len); for(int i=0;irepeat_reoffset[i] = -1; } /*ivi*/ mdi->ivi = IVI_create(); mdi->save_ivi = IVI_create(); /*��Դ��ѯ�������*/ TAILQ_INIT(&mdi->query_wait_lq); /*fuzzy*/ if(g_frag_cfg.fuzzy_digest_switch) { mdi->fuzzy = SFH_instance(mdi->media_len); } create_media_write_to_log(mdi, MEDIA_NEW, NULL); atomic_inc(&g_frag_stat.media_stat[LOG_MEDIA_CREATE]); media_stat(mdi, frg_unit); } else { /*��������Ƶ��Ŀһ��ʱ������������ּ�⣬renew��Ŀ��*/ if(!g_frag_cfg.app_switch && frg_unit->ab_offset==0 && ((MEDIA_SERVICE_TYPE_AV==mdi->media_service_type && mdi->renew_time < time(NULL))||mdi->hit_service==SERVICE_AUDIO_LANG)) { renew_media(mdi); media_create_flag = 1; } } save_media_opt(frg_unit,mdi); /*��Ƭȥ��*/ if(-1==media_frag_removal(mdi, frg_unit)) { return -1; } frag_write_to_log(SET_OFFSET, *(uint64_t*)key, frg_unit, NULL, 0); /*PID�洢*/ mdi->frag_unit_cnt++; if(NULL==mdi->pid) { mdi->pid = (uint64_t*)calloc(1, PID_MAXNUM*sizeof(uint64_t)); } for(int i=0; ipid== mdi->pid[i]) { in_set = 1; break; } } if(!in_set) { mdi->pid[mdi->pid_idx_last] = frg_unit->pid; mdi->pid_idx_last = (mdi->pid_idx_last+1)%PID_MAXNUM; } /*CAP_IP �ش����ݵ�ǰ�˻���IP*/ if(frg_unit->qd_info_from_cpz_idx_last==0) { if(save_qd_info(mdi->qd_info, QD_MAXNUM, &mdi->qdinfo_idx_last,frg_unit->capIP, frg_unit->mid)) { inet_ntop(AF_INET, &frg_unit->capIP, pbuf, buf_len); create_media_write_to_log(mdi, MEDIA_FROM_CAPIP, pbuf); } } /*��Դ�����£�������ƿװ��ǰ����Ϣ�����ڻؽ���*/ for(int i=0;iqd_info_from_cpz_idx_last;i++) { inet_ntop(AF_INET, &frg_unit->capIP, pbuf, buf_len); create_media_write_to_log(mdi, MEDIA_FROM_CPZIP, pbuf); save_qd_info(mdi->qd_info_from_cpz, QD_MAXNUM, &mdi->qdinfo_from_cpz_idx_last, frg_unit->qd_info_from_cpz[i].cap_ip, frg_unit->qd_info_from_cpz[i].mid); } /*monitor because of ��Ƶ����ȫ��Ŀ���*/ /*����Ƶ���ּ��������������ƴװ����������־���е���Ƶԭʼ�ļ���¼*/ audio_lang_monitor_service(frg_unit->src_ip, mdi); /*send ��⽨��*/ #if K_PROJECT if(frg_unit->hitservice >= 0x180 && frg_unit->hitservice < 0x200) { mdi->hit_service = frg_unit->hitservice; config_monitor_service(frg_unit->src_ip, mdi); } #else if(frg_unit->hitservice > 0x80) { mdi->hit_service = frg_unit->hitservice; config_monitor_service(frg_unit->src_ip, mdi); } #endif /*����ȫ��⣬���Dz���ǰ�˷�����*/ if(g_frag_cfg.all_hit_monitor_switch==ALL_HIT_MONITOR_FULLFILE && MEDIA_SERVICE_TYPE_AV==mdi->media_service_type) { FLAG_SET(mdi->flag, PROG_FLAG_DUMP); } /*media_create json*/ if(g_frag_cfg.media_json_switch && media_create_flag) { /*survey_json_report*/ media_json_report(mdi, TOPIC_EVENT_CREATE); } return 0; } int media_create(frag_unit_t* frg_unit) { long rec_cb = 0; /*��鶨ʱ��*/ if(g_frag_run.index_query_timer_on[frg_unit->thread_seq] && g_frag_run.index_query_timer[frg_unit->thread_seq]) { MESA_timer_check(g_frag_run.index_query_timer[frg_unit->thread_seq], time(NULL), g_frag_cfg.index_query_timer_cb_maxtime); } if(0!=frg_unit->mid) { MESA_htable_search_cb(g_frag_run.media_hash,(const uint8_t *)&frg_unit->mid,sizeof(frg_unit->mid), media_create_cb,(void*)frg_unit,&rec_cb); /*av record*/ av_record(frg_unit); } return (int)rec_cb; } void free_frag_unit(void* data) { frag_unit_t* frg_unit = (frag_unit_t*)data; if(NULL!=frg_unit) { atomic_inc(&frag_rssb.stat_info[RSSB_FREE_FRAG_UNIT]); switch(frg_unit->media_type) { case FILE_HLS: case FILE_OSMF: case FILE_REQ_FRAG: case FILE_MAYBE_FRAG: frag_redis_index_twice(frg_unit); break; default: break; } if(NULL!=frg_unit->text) { free_text(&frg_unit->text, frg_unit->text_num); } if(NULL!=frg_unit->frg_info) { for(int i=0;ifrg_info[i], 1); } frg_unit->frg_info = NULL; } for(int i=0;iopt[i]) { free_opt(&frg_unit->opt[i]); } } if(frg_unit->proto==AV_PROTOCOL_SIP) { free_frag_unit_sip_opt(frg_unit); } free(frg_unit); frg_unit = NULL; } } void init_frag_unit(frag_unit_t* frg_unit, uchar protocol) { frg_unit->frg_info = (text_t**)calloc(1, FRAG_UNIT_INFO_NUM*sizeof(text_t*)); } int add_media_info(msg_metainfo_t* minfo, char* opt, uint32_t src_ip, int thread_seq) { long rec_cb = 0; rssb_media_info_t media_info; frag_unit_t* frg_unit = NULL; media_info.pid = *(uint64_t*)minfo->prog_id; media_info.media_type = minfo->media_type; media_info.media_len = minfo->prog_len; media_info.protocol = minfo->protocol; media_info.hitservice = minfo->hitservice; media_info.data_flag = minfo->data_flag; media_info.flag = minfo->flag; media_info.opt_num = minfo->opt_num; media_info.opt = opt; media_info.cap_IP = minfo->cap_IP; media_info.src_ip = src_ip; media_info.thread_seq = thread_seq; /*����Ƶ��Ƭ��*/ if(is_frag(minfo->media_type)) { if(minfo->media_type==FILE_MAYBE_FRAG) { return 0; } MESA_htable_search_cb(frag_rssb.converge_hash, (const uint8_t *)&media_info.pid, sizeof(media_info.pid), converge_mediainfo_search_cb, (void*)&media_info, &rec_cb); } /*VOIPЭ��*/ else if(minfo->protocol==AV_PROTOCOL_SIP) { if(!(g_frag_cfg.voip_filter_switch && minfo->media_type==VOIP_UNKNOWN_MEDIA_TYPE)) { frg_unit = (frag_unit_t*)calloc(1,sizeof(frag_unit_t)); set_sip_frag_unit(&media_info, frg_unit); frg_unit->mid = frg_unit->pid; media_create(frg_unit); free_frag_unit(frg_unit); frg_unit = NULL; } } /*���߳�����*/ else { frg_unit = (frag_unit_t*)calloc(1,sizeof(frag_unit_t)); set_frag_unit(&media_info, frg_unit); /*ǰ���ʼ�û��addr�����ػ�δ���*/ frg_unit->mid = frg_unit->pid; media_create(frg_unit); free_frag_unit(frg_unit); frg_unit = NULL; } return rec_cb; } void trace_store_file(uint64_t mid, uint8_t media_type, uint8_t proto, uint16_t reoffset, uint64_t aboffset, char* data, uint32_t data_len) { FILE* pFile; char filename[MAX_PATH_LEN] = {0}; char data_path[MAX_PATH_LEN] = {0}; struct tm now; char day_time[32] = {0}; char* suffix = NULL; struct timeval tv; struct timezone tz; gettimeofday(&tv, &tz); localtime_r(&tv.tv_sec, &now); strftime(day_time, sizeof(day_time), "%Y%m%d", &now); if((g_frag_cfg.cpz_type == CPZ_VOIP)&&(media_type == AUDIO_VIVOX)) { return; } /*����media_type�����ļ���׺*/ suffix = gen_filesuffix_by_mediatype(suffix, media_type, proto); mkdir_r(g_frag_cfg.store_filepath); snprintf(data_path, sizeof(data_path), "%s/%s/", g_frag_cfg.store_filepath, day_time); mkdir_r(data_path); snprintf(filename, sizeof(filename), "%s/%s/%lu_%hu.%s", g_frag_cfg.store_filepath, day_time, mid, reoffset, suffix); pFile = fopen(filename,"r+"); if(NULL==pFile) { pFile = fopen(filename,"w"); fclose(pFile); pFile = fopen(filename,"r+"); } if(NULL!=pFile) { fseek(pFile,aboffset,SEEK_SET); fwrite(data, data_len, 1, pFile); fflush(pFile); fclose(pFile); } } void tag_frag_in_media(media_t* mdi, frag_in_t* frg) { mdi->pkt_proc++; mdi->byte_proc += frg->datalen; mdi->lastpkt_time = time(NULL); frg->new_mid = mdi->mid_after_multisrc; //frg->create_time = mdi->create_time; //frg->media_type = mdi->media_type; //frg->proto = mdi->proto; if(NULL!=mdi->fuzzy) { mdi->fuzzy_acc_len += SFH_feed(mdi->fuzzy, frg->data, frg->datalen, frg->offset_in); } /*�ڵ�һ�����ݰ�֮ǰ��Ҫ���ͽ�ĿԪ��Ϣ*/ if(!FLAG_TEST(mdi->flag, PROG_SEND_META)) { FLAG_SET(frg->frag_flag, FRAG_FLAG_SEND_META); FLAG_SET(mdi->flag, PROG_SEND_META); } /*�����Ƿ���Ҫ���͸���ϵͳ*/ if(FLAG_TEST(mdi->flag, PROG_FLAG_EXCP)) { FLAG_SET(frg->frag_flag, FRAG_FLAG_WINS); } /*�����Ƿ��Դ�����͸�������ƿװ*/ if(FLAG_TEST(mdi->td_query, TD_QUERY_RES_MULTISRC)) { FLAG_SET(frg->frag_flag, FRAG_FLAG_MULTISRC); frg->multisrc_bizmanip = mdi->multisrc_bizmanip; /*��Դ��mid�Ѿ����޸�*/ frg->new_mid = mdi->mid_after_multisrc; } if(g_frag_cfg.app_switch) { MESA_lqueue_join_tail(mdi->app_frg_lq, &frg, sizeof(frg)); atomic_inc(&frag_rssb.sysinfo_stat[RSSB_WAIT_QUEUE][QUEUE_IN]); frag_write_to_log(ADD_FRAG_TO_APP_LQ, frg->mid, frg, NULL, 0); } /*trace debug*/ if(g_frag_cfg.store_filepath_switch) { trace_store_file(frg->mid, mdi->media_type, mdi->proto, frg->seq, frg->offset, frg->data, frg->datalen); } } void copy_frg(frag_in_t* src, frag_in_t* dst) { memcpy(src, dst, sizeof(frag_in_t)); src->data = (char*)malloc(dst->datalen); memcpy(src->data, dst->data, dst->datalen); } int frag_add_tailq(media_t* mdi, frag_in_t* frg, int thread_seq) { frag_in_t* frg_new = (frag_in_t*)malloc(sizeof(frag_in_t)); struct queue_item* item = NULL; copy_frg(frg_new, frg); item = (struct queue_item*)malloc(sizeof(struct queue_item)); item->node = (void*)frg_new; item->thread_seq = thread_seq; TAILQ_INSERT_TAIL(&mdi->query_wait_lq, item, entries); atomic_inc(&g_frag_stat.sysinfo_stat[MULTISRC_QUEUE][QUEUE_IN]); frag_write_to_log(ADD_FRAG_TO_TAILQ, frg_new->mid, frg_new, NULL, 0); return 0; } /*���ض�Դ�ȴ�����,����ͬʱ����ش������Բ���free*/ int frag_add_query_wait_lq(media_t* mdi, frag_ivi_info_t* frag_ivi_info, uint32_t frag_stat) { frag_in_t* frg = frag_ivi_info->frg; if(1==frag_stat) { frag_add_tailq(mdi, frg, frag_ivi_info->thread_seq); } else if(frag_stat==2) { frag_in_t* frg_new = NULL; for(int i=0;ifrg_array_cnt;i++) { frg_new = frag_ivi_info->frg_array[i]; frag_add_tailq(mdi, frg_new, frag_ivi_info->thread_seq); } } return 0; } long media_preproc_cb(void *data, const uint8_t *key, uint size, void *user_arg) { int frag_stat = 0; media_t* mdi = (media_t*)data; frag_ivi_info_t* frag_ivi_info = (frag_ivi_info_t*)user_arg; frag_unit_t* frg_unit = frag_ivi_info->frg_unit; frag_in_t* frg = frag_ivi_info->frg; if(NULL==mdi) { return -1; } /*��Ŀ�յ������ݳ���ͳ��*/ mdi->pkt_in++; mdi->byte_in += frg->datalen; /*���м�����ã�������ȥ��֮ǰд�ļ�*/ if(g_frag_cfg.monitor_file_switch && FLAG_TEST(mdi->flag, PROG_FLAG_DUMP)) { monitor_service_dump_file(frg, mdi); } /*�����Ŀ�ظ�������Ҫ����*/ if(FLAG_TEST(mdi->td_query, TD_QUERY_RES_DEDUP) && !g_frag_cfg.dedup_invalid) { frag_ivi_info->td_query = mdi->td_query; return 1; } /*offset==0, log*/ if(0==frg->offset && MEDIA_SERVICE_TYPE_AV==mdi->media_service_type) { update_special_media_service(mdi, frg); /* if(!FLAG_TEST(mdi->flag, PROG_OFFSET_ZERO)) { atomic_inc(&g_frag_stat.media_stat[LOG_MEDIA_OFFSET_ZERO]); FLAG_SET(mdi->flag, PROG_OFFSET_ZERO); } create_media_write_to_log(mdi, MEDIA_OFFSET_ZERO, NULL); */ } /*record maxoffset*/ //mdi->maxoffset = MAX(frg->offset, mdi->maxoffset); /*IVI ȥ��*/ if(NULL!=frg) { frag_stat = media_removal(mdi, frg_unit, frg, frag_ivi_info); /*keep frag*/ if(frag_stat==1) { tag_frag_in_media(mdi, frg); media_byte_stat(mdi, frg_unit, frg); } /*new frag*/ else if(frag_stat==2) { /*every frag set info same with frag*/ for(int i=0;ifrg_array_cnt;i++) { frag_ivi_info->frg_array[i]->mid = frg->mid; frag_ivi_info->frg_array[i]->pid = frg->pid; frag_ivi_info->frg_array[i]->offset_in = frag_ivi_info->frg_array[i]->offset; frag_ivi_info->frg_array[i]->seq = frg->seq; frag_ivi_info->frg_array[i]->thread_seq = frg->thread_seq; tag_frag_in_media(mdi, frag_ivi_info->frg_array[i]); media_byte_stat(mdi, frg_unit, frag_ivi_info->frg_array[i]); } } else { return 0; } } /*offset==0, log*/ if(0==frg->offset) { if(!FLAG_TEST(mdi->flag, PROG_OFFSET_ZERO)) { atomic_inc(&g_frag_stat.media_stat[LOG_MEDIA_OFFSET_ZERO]); FLAG_SET(mdi->flag, PROG_OFFSET_ZERO); } create_media_write_to_log(mdi, MEDIA_OFFSET_ZERO, NULL); } /*record maxoffset*/ mdi->maxoffset = MAX(frg->offset, mdi->maxoffset); /*���Թ���:�洢��Ч�ļ�dmj*/ if(g_frag_cfg.save_media) { if(g_frag_cfg.cpz_type == CPZ_VOIP) { if(frg->datalen > sizeof(voip_header_t)) { uint32_t datalen = frg->datalen - sizeof(voip_header_t); char* data = (char*)calloc(1,datalen); memcpy(data,frg->data + sizeof(voip_header_t),datalen); IVI_seg_t* a_ivi_seg = IVI_seg_malloc(frg->offset, frg->offset + datalen - 1, data); IVI_insert(mdi->save_ivi,a_ivi_seg); } } else { char* data = (char*)calloc(1,frg->datalen); memcpy(data,frg->data,frg->datalen); IVI_seg_t* a_ivi_seg = IVI_seg_malloc(frg->offset, frg->offset + frg->datalen-1, data); IVI_insert(mdi->save_ivi,a_ivi_seg); } } return (long)frag_stat; } int media_preproc(frag_ivi_info_t* frag_ivi_info) { long rec_cb = -1; frag_in_t* frg = frag_ivi_info->frg; if(0!=frg->mid) { MESA_htable_search_cb(g_frag_run.media_hash,(const uint8_t *)&frg->mid,sizeof(frg->mid), media_preproc_cb,(void*)frag_ivi_info,&rec_cb); } return (int)rec_cb; } /*���������У����Զ�η���*/ void frag_try_add_wait_lq(uint8_t td_query, frag_in_t* frg, int thread_seq) { int lq_rec = 0; //int try_num = 0; atomic_inc(&g_frag_stat.stat_info[TO_SEND][TOTAL_PKTS]); atomic_add(&g_frag_stat.stat_info[TO_SEND][TOTAL_BYTES], frg->datalen); if(FLAG_TEST(td_query, TD_QUERY_RES_DEDUP)) { atomic_inc(&g_frag_stat.stat_info[DEDUP_DROP][TOTAL_PKTS]); atomic_add(&g_frag_stat.stat_info[DEDUP_DROP][TOTAL_BYTES], frg->datalen); } if(!FLAG_TEST(td_query, TD_QUERY_RES_DEDUP) || g_frag_cfg.dedup_invalid) { lq_rec = MESA_lqueue_join_tail(frag_rssb.wait_lq[thread_seq], &frg, sizeof(frg)); if(lq_rec==MESA_QUEUE_RET_OK) { frag_write_to_log(ADD_FRAG_TO_WAIT_LQ, frg->mid, frg, NULL, 0); atomic_inc(&frag_rssb.sysinfo_stat[RSSB_WAIT_QUEUE][QUEUE_IN]); } else { atomic_inc(&g_frag_stat.stat_info[WAIT_QUEUE_FULL_DROP][TOTAL_PKTS]); atomic_add(&g_frag_stat.stat_info[WAIT_QUEUE_FULL_DROP][TOTAL_BYTES], frg->datalen); free_frag_in(frg,0,NULL); } } else { free_frag_in(frg,0,NULL); } } /*���Ͷ���*/ int frag_add_wait_lq(frag_ivi_info_t* frag_ivi_info, uint32_t frag_stat, int thread_seq) { frag_in_t* frg = frag_ivi_info->frg; /*rec=1: IVI ȥ��ֻ��һ��frag rec=2:IVIȥ��ֻ������frag*/ if(1==frag_stat) { frag_try_add_wait_lq(frag_ivi_info->td_query, frg, thread_seq); } else if(frag_stat==2) { frag_in_t* frg_new = NULL; for(int i=0;ifrg_array_cnt;i++) { frg_new = frag_ivi_info->frg_array[i]; frag_try_add_wait_lq(frag_ivi_info->td_query, frg_new, thread_seq); frag_ivi_info->frg_array[i] = NULL; } free_frag_in(frg,0,NULL); } else { free_frag_in(frg,0,NULL); } return 0; } /*����ֵ��-1��*/ int frag_service(frag_ivi_info_t* frag_ivi_info, uint32_t src_ip, int thread_seq) { int frag_stat = 0; frag_ivi_info->thread_seq = thread_seq; frag_ivi_info->mid = frag_ivi_info->frg->mid; /*��Ƭ���������ظ����ݲ��ٴ���*/ if(frag_ivi_info->frg_unit!=NULL && frag_ivi_info->frg_unit->repeat_not_proc) { atomic_inc(&g_frag_stat.stat_info[FRAG_DEDUP][TOTAL_PKTS]); atomic_add(&g_frag_stat.stat_info[FRAG_DEDUP][TOTAL_BYTES],frag_ivi_info->frg->datalen); return -1; } /*create media, because no cnvg and no index*/ frag_stat = media_preproc(frag_ivi_info); if(-1==frag_stat) { /*û���ҵ���Ŀ��������û��Ԫ��Ϣ*/ frag_write_to_log(MEDIA_NO_META, frag_ivi_info->frg->pid, NULL, NULL, 0); atomic_inc(&g_frag_stat.stat_info[MEDIA_NOMETA][TOTAL_PKTS]); atomic_add(&g_frag_stat.stat_info[MEDIA_NOMETA][TOTAL_BYTES],frag_ivi_info->frg->datalen); return -1; } /*��Ƭ����������:����У�����*/ if(!g_frag_cfg.app_switch) { frag_add_wait_lq(frag_ivi_info, frag_stat, thread_seq); } return 0; } long converge_data_search_cb(void *data, const uint8_t *key, uint size, void *user_arg) { frag_ivi_info_t* frag_ivi_info = (frag_ivi_info_t*)user_arg; frag_unit_t* frg_unit = (frag_unit_t*)data; frag_in_t* frg = frag_ivi_info->frg; if(NULL==frg_unit) { return -1; } frag_ivi_info->frg_unit = frg_unit; /*��Ƭ���������������IJ��ٴ����������ﶪ�������ٲ��Ŀ�Ĵ���*/ if(frg_unit->repeat_not_proc) { atomic_inc(&g_frag_stat.stat_info[FRAG_DEDUP][TOTAL_PKTS]); atomic_add(&g_frag_stat.stat_info[FRAG_DEDUP][TOTAL_BYTES],frg->datalen); free_frag_in(frg,0,NULL); return 0; } /*write log*/ frag_write_to_log(ADD_FRAG, frg->pid, frg, NULL, 0); if(frg_unit->frag_state==STAT_OK) { frg->mid = frg_unit->mid; return 1; } if(frg_unit->frag_state==STAT_CNVG_QUERY) { /*join frag to cnvg_lq*/ MESA_lqueue_join_tail(frg_unit->frg_cnvg_lq, &frg, sizeof(frg)); /*write log*/ frag_write_to_log(ADD_FRAG_TO_CNVG_LQ, frg->mid, frg, NULL, 0); atomic_inc(&frag_rssb.sysinfo_stat[RSSB_CNVG_QUEUE][QUEUE_IN]); } else if(frg_unit->frag_state==STAT_INDEX_QUERY) { MESA_lqueue_join_tail(frg_unit->frg_index_lq, &frg, sizeof(frg)); frag_write_to_log(ADD_FRAG_TO_INDEX_LQ, frg->mid, frg, NULL, 0); atomic_inc(&frag_rssb.sysinfo_stat[RSSB_INDEX_QUEUE][QUEUE_IN]); } return 0; } int add_frag(uint64_t pid, uint64_t offset, char* data, uint32_t datalen, uint8_t protocol, uint32_t src_ip, int thread_seq) { frag_ivi_info_t* frag_ivi_info = (frag_ivi_info_t*)calloc(1, sizeof(frag_ivi_info_t)); frag_in_t* frg = NULL; long rec = 0; frg = (frag_in_t*)calloc(1, sizeof(frag_in_t)); frg->mid = pid; frg->pid = pid; frg->offset = offset; frg->datalen = datalen; frg->data = (char*)malloc(frg->datalen); memcpy(frg->data, data, datalen); frg->thread_seq = thread_seq; frg->src_ip = src_ip; /*������Ӧ��*/ send_ack_to_qd(frg, src_ip, thread_seq); frag_ivi_info->frg = frg; MESA_htable_search_cb(frag_rssb.converge_hash, (const uint8_t *)&pid, sizeof(pid), converge_data_search_cb, (void*)frag_ivi_info, &rec); /*��Ƭ����������׼������*/ if(1==rec) { rec=frag_service(frag_ivi_info, src_ip, thread_seq); if(-1==rec) { free_frag_in(frg,0,NULL); } } /*��ͳ��Ŀ*/ else if(-1==rec) { rec = frag_service(frag_ivi_info, src_ip, thread_seq); /*û��Ԫ��Ϣ,�ͷ�*/ if(-1==rec) { free_frag_in(frg,0,NULL); } } free(frag_ivi_info); return rec; } /*return * -1:��Ŀ������ * 0:������Ԫ��Ϣ * 1:����Ԫ��Ϣ */ long get_media(void *data, const uint8_t *key, uint size, void *user_arg) { media_t* mdi = (media_t*)data; media_info_t* media_info = (media_info_t*)user_arg; int opt_num = 0; if(NULL!=mdi) { media_info->td_query = mdi->td_query; media_info->mdi_flag = mdi->flag; /*media_len*//*mid,protocol,mediatype,file_name*/ media_info->prog_len = mdi->media_len; media_info->mid = mdi->mid_after_multisrc; media_info->protocol = mdi->proto; media_info->data_flag = mdi->data_flag; media_info->flag = mdi->meta_flag; /*��Դ��������۴�ƴװ�ϲ�����ɢ����ʶ*/ if(FLAG_TEST(mdi->td_query, TD_QUERY_RES_MULTISRC)) { media_info->hitservice = 0; } else { media_info->hitservice = mdi->hit_service; } /*����ͳ��for data*/ /*����HLS OSMF��������ƬЭ��media_type����Ϊ��ͳ��Ŀ��mediatype*/ if(mdi->media_type==FILE_REQ_FRAG) { media_info->media_type = FILE_AV; media_info->prog_len = 0; } else { media_info->media_type = mdi->media_type; } if(g_frag_cfg.modify_capIP_switch) { media_info->cap_IP = g_frag_cfg.local_ip_nr; } else { media_info->cap_IP = mdi->qd_info[0].cap_ip; } /*��Դ������������ƴ��װ����Ҫ����ѡ��*/ if(FLAG_TEST(mdi->td_query, TD_QUERY_RES_MULTISRC)) { /*������ƿװ��ѡ��*/ if(NULL!=mdi->opt[MEDIA_OPT_URL][mdi->url_opt_index]) { media_info->opt_unit = (struct opt_unit_t*)calloc(2+mdi->qdinfo_idx_last, sizeof(struct opt_unit_t)); /*META_OPT_LAYER_ADDR*/ media_info->opt_unit[0].opt_len = sizeof(uint32_t)+sizeof(uint8_t)+mdi->opt[MEDIA_OPT_ADDR][mdi->url_opt_index]->opt_len; media_info->opt_unit[0].opt_type = META_OPT_LAYER_ADDR; media_info->opt_unit[0].opt_value = (char*)calloc(1, mdi->opt[MEDIA_OPT_ADDR][mdi->url_opt_index]->opt_len); memcpy(media_info->opt_unit[0].opt_value, mdi->opt[MEDIA_OPT_ADDR][mdi->url_opt_index]->opt_value, mdi->opt[MEDIA_OPT_ADDR][mdi->url_opt_index]->opt_len); opt_num++; /*META_OPT_LAYER_URL*/ media_info->opt_unit[1].opt_len = sizeof(uint32_t)+sizeof(uint8_t)+mdi->opt[MEDIA_OPT_URL][mdi->url_opt_index]->opt_len; media_info->opt_unit[1].opt_type = META_OPT_LAYER_URL; media_info->opt_unit[1].opt_value = (char*)calloc(1, mdi->opt[MEDIA_OPT_URL][mdi->url_opt_index]->opt_len); memcpy(media_info->opt_unit[1].opt_value, mdi->opt[MEDIA_OPT_URL][mdi->url_opt_index]->opt_value, mdi->opt[MEDIA_OPT_URL][mdi->url_opt_index]->opt_len); opt_num++; } else { media_info->opt_unit = (struct opt_unit_t*)calloc(1+mdi->qdinfo_idx_last, sizeof(struct opt_unit_t)); /*META_OPT_LAYER_ADDR*/ media_info->opt_unit[0].opt_len = sizeof(uint32_t)+sizeof(uint8_t)+mdi->opt[MEDIA_OPT_ADDR][mdi->url_opt_index]->opt_len; media_info->opt_unit[0].opt_type = META_OPT_LAYER_ADDR; media_info->opt_unit[0].opt_value = (char*)calloc(1, mdi->opt[MEDIA_OPT_ADDR][mdi->url_opt_index]->opt_len); memcpy(media_info->opt_unit[0].opt_value, mdi->opt[MEDIA_OPT_ADDR][mdi->url_opt_index]->opt_value, mdi->opt[MEDIA_OPT_ADDR][mdi->url_opt_index]->opt_len); opt_num++; } /*META_OPT_MID_BEFORE_MULTISRC*/ for(int i=0;iqdinfo_idx_last;i++) { media_info->opt_unit[2+i].opt_len = sizeof(uint32_t)+sizeof(uint8_t)+sizeof(uint64_t)+sizeof(uint32_t); media_info->opt_unit[2+i].opt_type = META_OPT_INFO_BEFORE_MULTISRC; media_info->opt_unit[2+i].opt_value = (char*)calloc(1, sizeof(uint64_t)+sizeof(uint32_t)); memcpy(media_info->opt_unit[2+i].opt_value, &mdi->qd_info[i].mid, sizeof(uint64_t)); memcpy(media_info->opt_unit[2+i].opt_value+sizeof(uint64_t), &mdi->qd_info[i].cap_ip, sizeof(uint32_t)); opt_num++; } } else { /*SIP ѡ��*/ if(mdi->proto==AV_PROTOCOL_SIP && mdi->sip_rate_info!=NULL) { media_info->opt_unit = (struct opt_unit_t*)calloc(1, sizeof(struct opt_unit_t)); media_info->opt_unit->opt_len = sizeof(uint32_t)+sizeof(uint8_t)+mdi->sip_rate_info->opt_len; media_info->opt_unit->opt_type = META_OPT_SIP_SEND_RATE_INFO; media_info->opt_unit->opt_value = (char*)calloc(1, mdi->sip_rate_info->opt_len); memcpy(media_info->opt_unit->opt_value, mdi->sip_rate_info->opt_value, mdi->sip_rate_info->opt_len); opt_num++; } #if K_PROJECT #else if(mdi->proto!=AV_PROTOCOL_SIP) { /*�������ݷ�������ѡ��*/ media_info->opt_unit = (struct opt_unit_t*)calloc(1, sizeof(struct opt_unit_t)); media_info->opt_unit->opt_len = sizeof(uint32_t)+sizeof(uint8_t)+sizeof(unsigned int); media_info->opt_unit->opt_type = OPT_SOURCE_IP; media_info->opt_unit->opt_value = (char*)calloc(1, sizeof(unsigned int)); *(unsigned int*)(media_info->opt_unit->opt_value) = g_frag_cfg.local_ip_nr; opt_num++; } #endif } media_info->opt_num = opt_num; return 1; } return 0; } void frag_reassembly_stats_init(const char* logdir, const char* filename) { char log[MAX_PATH_LEN]={0}; int value = 0; char conf_buf[MAX_PATH_LEN]={0}; memset(conf_buf, 0, sizeof(conf_buf)); frag_rssb.stat_handle = FS_create_handle(); MESA_load_profile_string_def(filename, "LOG", "StatFile", conf_buf, sizeof(conf_buf),"./log/frag_rssb/frag_reassembly_stat.log"); snprintf(log,sizeof(log),"%s/%s", logdir,conf_buf); FS_set_para(frag_rssb.stat_handle, OUTPUT_DEVICE, log, strlen(log)+1); value = 1;//flush by date FS_set_para(frag_rssb.stat_handle, FLUSH_BY_DATE, &value, sizeof(value)); value = 2;//append FS_set_para(frag_rssb.stat_handle, PRINT_MODE, &value, sizeof(value)); MESA_load_profile_short_def(filename, "LOG", "StatInterval", (short*)&frag_rssb.stat_interval,0); FS_set_para(frag_rssb.stat_handle, STAT_CYCLE, &frag_rssb.stat_interval, sizeof(frag_rssb.stat_interval)); value = (frag_rssb.stat_interval!=0) ? 1 : 0; FS_set_para(frag_rssb.stat_handle, PRINT_TRIGGER, &value, sizeof(value)); value = 0; FS_set_para(frag_rssb.stat_handle, CREATE_THREAD, &value, sizeof(value)); FS_set_para(frag_rssb.stat_handle, APP_NAME, g_frag_stat.fs_app, strlen(g_frag_stat.fs_app)+1); if(g_frag_stat.fs_remote_switch) { FS_set_para(frag_rssb.stat_handle, STATS_SERVER_IP, g_frag_stat.fs_ip, strlen(g_frag_stat.fs_ip)+1); FS_set_para(frag_rssb.stat_handle, STATS_SERVER_PORT, &g_frag_stat.fs_port, sizeof(g_frag_stat.fs_port)); } memset(conf_buf, 0, sizeof(conf_buf)); frag_rssb.sysinfo_handle = FS_create_handle(); MESA_load_profile_string_def(filename, "LOG", "SysinfoFile", conf_buf, sizeof(conf_buf),"./log/frag_rssb/frag_reassembly_sysinfo.log"); snprintf(log,sizeof(log),"%s/%s", logdir,conf_buf); FS_set_para(frag_rssb.sysinfo_handle, OUTPUT_DEVICE, log, strlen(log)+1); value = 1;//flush by date FS_set_para(frag_rssb.sysinfo_handle, FLUSH_BY_DATE, &value, sizeof(value)); value = 2;//append FS_set_para(frag_rssb.sysinfo_handle, PRINT_MODE, &value, sizeof(value)); MESA_load_profile_short_def(filename, "LOG", "SysinfoInterval", (short*)&frag_rssb.sysinfo_interval,0); FS_set_para(frag_rssb.sysinfo_handle, STAT_CYCLE, &frag_rssb.sysinfo_interval, sizeof(frag_rssb.sysinfo_interval)); value = (frag_rssb.sysinfo_interval!=0) ? 1 : 0; FS_set_para(frag_rssb.sysinfo_handle, PRINT_TRIGGER, &value, sizeof(value)); value = 0; FS_set_para(frag_rssb.sysinfo_handle, CREATE_THREAD, &value, sizeof(value)); } int read_frag_reassembly_conf(int thread_num, const char* logdir, const char* filename, int* maat_stat_swicth, int* maat_perf_swicth) { int log_level = 0; char log_name[MAX_PATH_LEN]={0}; char log[MAX_PATH_LEN]={0}; /*frag reassembly log*/ MESA_load_profile_short_def(filename, "LOG", "FragReassemblyLogLevel", (short*)&frag_rssb.logger_level,10); MESA_load_profile_string_def(filename, "LOG", "FragReassemblyLogName", log_name , sizeof(log_name),"./log/frag_rssb/frag_reassembly.log"); snprintf(log,sizeof(log),"%s/%s", logdir,log_name); frag_rssb.logger = MESA_create_runtime_log_handle(log,frag_rssb.logger_level); if(NULL==frag_rssb.logger) { printf("read_frag_reassembly_conf: get FragReassemblyLog logger error.\n"); return -1; } /*maat stat log*/ MESA_load_profile_int_def(filename, "LOG", "MaatStatSwitch", maat_stat_swicth, 0); MESA_load_profile_int_def(filename, "LOG", "MaatPerfSwitch", maat_perf_swicth, 0); /*media log*/ memset(log, 0, sizeof(log)); memset(log_name, 0, sizeof(log_name)); MESA_load_profile_short_def(filename, "LOG", "MediaLogLevel", (short*)&log_level,30); MESA_load_profile_string_def(filename, "LOG", "MediaLogName", log_name , sizeof(log_name),"./log/frag_rssb/frag_reassembly_media.log"); snprintf(log,sizeof(log),"%s/%s", logdir,log_name); frag_rssb.media_logger = MESA_create_runtime_log_handle(log,log_level); if(NULL==frag_rssb.media_logger) { printf("read_frag_reassembly_conf: get MediaLog media_logger error.\n"); return -1; } /*sysinfo log*/ frag_reassembly_stats_init(logdir, filename); /*redis cluster init*/ int timeout = 0; MESA_load_profile_int_def(filename, "NETWORK", "RedisClusterSwitch", &frag_rssb.redis_cluster_switch, 1); MESA_load_profile_int_def(filename, "NETWORK", "RedisTimeout", &timeout, 5); MESA_load_profile_string_nodef(filename, "NETWORK", "RedisBrokers", frag_rssb.redis_addr, sizeof(frag_rssb.redis_addr)); //MESA_load_profile_string_nodef(filename, "NETWORK", "RedisNet", frag_rssb.redis_cluster_net, sizeof(frag_rssb.redis_cluster_net)); //MESA_load_profile_int_def(filename, "NETWORK", "RedisUniqIPFlag", &frag_rssb.redis_cluster_netflag, 512); MESA_load_profile_string_nodef(filename, "NETWORK", "RedisIP", frag_rssb.redis_ip, sizeof(frag_rssb.redis_ip)); MESA_load_profile_int_def(filename, "NETWORK", "RedisPort", &frag_rssb.redis_port, 6379); frag_rssb.redis_tv.tv_sec = timeout; frag_rssb.redis_tv.tv_usec = 0; /*wait queue num*/ MESA_load_profile_uint_def(filename, "SYSTEM", "WaitQueueNum", &frag_rssb.wait_lq_num, 5000000); /*init hash size*/ MESA_load_profile_uint_def(filename, "SYSTEM", "ConvergeHashThreadSafe", &frag_rssb.cnvg_hash_thread_safe, 64); MESA_load_profile_uint_def(filename, "SYSTEM", "ConvergeHashSize", &frag_rssb.cnvg_hash_max_elem_num, 1024*1024); MESA_load_profile_uint_def(filename, "SYSTEM", "ConvergeHashElemNum", &frag_rssb.cnvg_hash_size, 1024*1024*16); MESA_load_profile_uint_def(filename, "SYSTEM", "ConvergeHashExpireTime", &frag_rssb.cnvg_hash_expire_time, 120); return 0; } void frag_reassembly_release() { int i=0; if(NULL!=frag_rssb.sifter) { destroy_sifter(frag_rssb.sifter); } if(NULL!=frag_rssb.converge_hash) { MESA_htable_destroy(frag_rssb.converge_hash, free_frag_unit); } for(i=0;i