summaryrefslogtreecommitdiff
path: root/src/frag_reassembly.c
diff options
context:
space:
mode:
authorlishu <[email protected]>2018-09-29 14:57:32 +0800
committerlishu <[email protected]>2018-09-29 14:57:32 +0800
commit19cfcaf353ae4488927fc250361f8baa48f9ffb9 (patch)
tree1cf82bd8c17044090777b067ed16c95b4269466b /src/frag_reassembly.c
20180929 first commit
Diffstat (limited to 'src/frag_reassembly.c')
-rw-r--r--src/frag_reassembly.c1639
1 files changed, 1639 insertions, 0 deletions
diff --git a/src/frag_reassembly.c b/src/frag_reassembly.c
new file mode 100644
index 0000000..b3e8e91
--- /dev/null
+++ b/src/frag_reassembly.c
@@ -0,0 +1,1639 @@
+/*
+author: lishu
+start date: 2015-9-23
+function:
+1. frag_reassembly use for netdisk and HLS/OSMF av.
+2. the difference between netdisk and AV
+ netdisk maybe boundary when upload , need extract, but AV not proc FRAG_CONTENT
+*/
+
+#include <sys/ioctl.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <math.h>
+#include <net/if.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <time.h>
+#include <openssl/md5.h>
+#include <sys/time.h>
+#include <assert.h>
+#include <sys/queue.h>
+#include <assert.h>
+
+
+#include "MESA_handle_logger.h"
+#include "MESA_prof_load.h"
+#include "MESA_htable.h"
+#include "MESA_list_queue.h"
+#include "MESA_trace.h"
+#include "stream.h"
+
+#include "cJSON.h"
+
+#include "interval_index.h"
+#include "stream_fuzzy_hash.h"
+#include "soqav_dedup.h"
+#include "app_detect.h"
+#include "bizman.h"
+
+#include "AV_sendback_in.h"
+#include "AV_sendback.h"
+#include "common.h"
+#include "frag_reassembly_in.h"
+#include "frag_reassembly.h"
+#include "frag_proc.h"
+#include "frag_dedup.h"
+#include "frag_redis.h"
+#include "frag_av.h"
+#include "frag_app.h"
+#include "frag_voip.h"
+#include "frag_json.h"
+#include "frag_dedup.h"
+#include "av_record.h"
+#include "sifter.h"
+#include "service.h"
+#include "log.h"
+#include "field_stat2.h"
+
+extern frag_rssb_parameter_t g_frag_run;
+extern frag_rssb_configure_t g_frag_cfg;
+extern frag_rssb_status_t g_frag_stat;
+
+frag_reassembly_t frag_rssb;
+extern const char* hash_eliminate_type[3];
+
+int expire_media_hash_node(void *data, int eliminate_type)
+{
+ media_t* mdi = (media_t*)data;
+ switch(eliminate_type)
+ {
+ case ELIMINATE_TYPE_NUM:
+ atomic_inc(&g_frag_stat.sysinfo_stat[MEDIA_HASH][HASH_NUM_EXPIRE]);
+ break;
+ case ELIMINATE_TYPE_TIME:
+ atomic_inc(&g_frag_stat.sysinfo_stat[MEDIA_HASH][HASH_TIME_EXPIRE]);
+ break;
+ default:
+ break;
+
+ }
+ MESA_handle_runtime_log(g_frag_run.logger, RLOG_LV_INFO, FRAG_REASSEMBLY_MODULE_NAME,
+ "{%s:%d} expire_media_hash_node %s: [MID: %llu]",
+ __FILE__,__LINE__,
+ hash_eliminate_type[eliminate_type],
+ mdi->mid);
+
+ return 1;
+}
+
+void free_text(text_t** pp, int n_p)
+{
+ text_t* p = *pp;
+ if(NULL!=p)
+ {
+ for(int i=0;i<n_p;i++)
+ {
+ if(NULL!=p+i)
+ {
+ if(NULL!=(p+i)->text)
+ {
+ free((p+i)->text);
+ (p+i)->text = NULL;
+ }
+ }
+ }
+ free(p);
+ p = NULL;
+ }
+ *pp = NULL;
+}
+
+void free_opt(opt_in_t** data)
+{
+ opt_in_t* opt = (opt_in_t*)(*data);
+ if(NULL!=opt)
+ {
+ if(NULL!=opt->opt_value)
+ {
+ free(opt->opt_value);
+ }
+ free(opt);
+ }
+ *data = NULL;
+}
+
+int free_frag_in(void *data, long data_len, void *arg)
+{
+ frag_in_t* frg_in = (frag_in_t*)data;
+ if(NULL!=frg_in)
+ {
+ if(frg_in->data!=NULL)
+ {
+ free(frg_in->data);
+ }
+ free(frg_in);
+ }
+ return 0;
+}
+
+int64_t renew_time_accord_medialen(uint64_t media_len)
+{
+ int64_t renew_time = g_frag_cfg.renew_time_min + media_len / 1024 / 1024 * g_frag_cfg.renew_time_step; /* prog_len(MB)*step */
+ if(renew_time > g_frag_cfg.renew_time_max)
+ {
+ renew_time = g_frag_cfg.renew_time_max;
+ }
+ return(renew_time);
+}
+
+void renew_media(media_t* mdi)
+{
+ if(NULL!=mdi)
+ {
+ expire_media_write_to_log(mdi, MEDIA_RENEW_EXPIRE, NULL);
+ create_media_write_to_log(mdi, MEDIA_RENEW, NULL);
+ atomic_inc(&g_frag_stat.media_stat[LOG_MEDIA_RENEW]);
+
+ FLAG_CLEAR(mdi->flag, PROG_SEND_META);
+ mdi->create_time = time(NULL);
+ mdi->renew_time = mdi->create_time + renew_time_accord_medialen(mdi->media_len);
+ mdi->pkt_proc = 0;
+ mdi->pkt_in = 0;
+ mdi->byte_proc = 0;
+ mdi->byte_in = 0;
+ for(int i=0;i<KEEP_REOFFSET_MAXNUM;i++)
+ {
+ mdi->repeat_reoffset[i] = 0;
+ }
+ IVI_destroy(mdi->ivi, NULL, NULL);
+ mdi->ivi = IVI_create();
+ }
+}
+
+void save_media(media_t* mdi)
+{
+ int ivi_seg_cnt = IVI_seg_cnt(mdi->save_ivi);
+ int tmp_cnt = ivi_seg_cnt;
+ IVI_seg_t* ivi_first_seg = IVI_first_seg(mdi->save_ivi);
+ IVI_seg_t* ivi_seg = ivi_first_seg;
+ IVI_seg_t* next_seg;
+
+ FILE* pFile;
+ char filename[MAX_PATH_LEN] = {0};
+ char data_path[MAX_PATH_LEN] = {0};
+ struct tm now;
+ char day_time[32] = {0};
+ char* suffix = NULL;
+ struct timeval tv;
+ struct timezone tz;
+ gettimeofday(&tv, &tz);
+ localtime_r(&tv.tv_sec, &now);
+ strftime(day_time, sizeof(day_time), "%Y%m%d", &now);
+
+ if((g_frag_cfg.cpz_type == CPZ_VOIP)&&((mdi->media_type == AUDIO_VIVOX) ||( mdi->media_type==FILE_UNKNOWN)))
+ {
+ return;
+ }
+ if(NULL==ivi_seg) return;
+ suffix = gen_filesuffix_by_mediatype(suffix, mdi->media_type, mdi->proto);
+
+ mkdir_r(g_frag_cfg.save_media_path);
+ snprintf(data_path, sizeof(data_path), "%s/%s/", g_frag_cfg.save_media_path, day_time);
+ mkdir_r(data_path);
+
+ int reoffset = 0;
+ snprintf(filename, sizeof(filename), "%s/%s/%lu_%hu.%s", g_frag_cfg.save_media_path, day_time, mdi->mid, reoffset, suffix);
+
+ pFile = fopen(filename,"r+");
+ if(NULL==pFile)
+ {
+ pFile = fopen(filename,"w");
+ fclose(pFile);
+ pFile = fopen(filename,"r+");
+ }
+ if(NULL!=pFile)
+ {
+ fwrite(ivi_seg->data, ivi_seg->right - ivi_seg->left+1, 1, pFile);
+ }
+
+ while( -- tmp_cnt)
+ {
+ next_seg = IVI_next_seg(ivi_seg);
+ if(ivi_seg->right == next_seg->left - 1)
+ {
+ fwrite(next_seg->data, next_seg->right - next_seg->left+1, 1, pFile);
+ }
+ else
+ {
+ fflush(pFile);
+ fclose(pFile);
+ reoffset ++;
+ snprintf(filename, sizeof(filename), "%s/%s/%lu_%hu.%s", g_frag_cfg.save_media_path, day_time, mdi->mid, reoffset, suffix);
+ pFile = fopen(filename,"r+");
+ if(NULL==pFile)
+ {
+ pFile = fopen(filename,"w");
+ fclose(pFile);
+ pFile = fopen(filename,"r+");
+ }
+ if(NULL!=pFile)
+ {
+ fwrite(next_seg->data, next_seg->right - next_seg->left+1, 1, pFile);
+ }
+ }
+ ivi_seg = next_seg;
+ }
+ fflush(pFile);
+ fclose(pFile);
+
+ //free ivi
+ while(ivi_seg_cnt--)
+ {
+ ivi_first_seg = IVI_first_seg(mdi->save_ivi);
+ free(ivi_first_seg->data);
+ ivi_first_seg->data = NULL;
+ IVI_remove(mdi->save_ivi,ivi_first_seg);
+ IVI_seg_free(ivi_first_seg,NULL,NULL);
+ }
+ IVI_destroy(mdi->save_ivi,NULL,NULL);
+}
+
+void free_media(void* data)
+{
+ media_t* mdi = (media_t*)data;
+ double complte_rate = 0;
+ char filename[MAX_PATH_LEN] = {0};
+
+ if(NULL!=mdi)
+ {
+ expire_media_write_to_log(mdi, MEDIA_EXPIRE, NULL);
+ atomic_inc(&g_frag_stat.media_stat[LOG_MEDIA_EXPIRE]);
+
+ if(g_frag_cfg.app_switch)
+ {
+ if(mdi->app_frg_lq)
+ {
+ proc_app_data(mdi);
+ if(NULL!=mdi->app_frg_lq)
+ {
+ MESA_lqueue_destroy(mdi->app_frg_lq, NULL, NULL);
+ mdi->app_frg_lq = NULL;
+ }
+ }
+ if(NULL!=mdi->addrlist)
+ {
+ free(mdi->addrlist);
+ }
+ }
+
+ if(g_frag_cfg.save_media)
+ {
+ save_media(mdi);
+ }
+
+ /*survey_json_report*/
+ if(g_frag_cfg.media_json_switch)
+ {
+ media_json_report(mdi, TOPIC_EVENT_EXPIRE);
+ }
+
+ /*av_dup report upload*/
+ if(g_frag_cfg.av_dedup_switch && MEDIA_SERVICE_TYPE_AV==mdi->media_service_type)
+ {
+ media_dedup_report(mdi);
+ }
+ if(mdi->media_service_type==MEDIA_SERVICE_TYPE_SIP)
+ {
+ send_sip_log_when_expire(mdi);
+ free_media_sip(mdi);
+ }
+ /*delete monitor file whose size is lower than ...*/
+ if(g_frag_cfg.all_hit_monitor_switch==ALL_HIT_MONITOR_FULLFILE && MEDIA_SERVICE_TYPE_AV==mdi->media_service_type && g_frag_cfg.all_hit_monitor_complete_rate>0)
+ {
+ complte_rate = (double)mdi->byte_proc/(double)mdi->media_len*100;
+ if(complte_rate < g_frag_cfg.all_hit_monitor_complete_rate)
+ {
+ gen_monitor_file_path(mdi->create_time, mdi->mid, (char*)"flv", filename, MAX_PATH_LEN);
+ remove(filename);
+ }
+ }
+
+ if(NULL!=mdi->fuzzy)
+ {
+ av_digest_record(mdi);
+ SFH_release(mdi->fuzzy);
+ mdi->fuzzy = NULL;
+ }
+
+ for(int i=0;i<MEDIA_OPT_MAXNUN;i++)
+ {
+ for(int j=0;j<mdi->opt_index;j++)
+ {
+ if(NULL!=mdi->opt[i][j])
+ {
+ free_opt(&mdi->opt[i][j]);
+ }
+ }
+ }
+ if(NULL!=mdi->td_data)
+ {
+ free(mdi->td_data);
+ mdi->td_data = NULL;
+ }
+ if(NULL!=mdi->pid)
+ {
+ free(mdi->pid);
+ }
+ if(NULL!=mdi->monitor_path)
+ {
+ free(mdi->monitor_path);
+ }
+
+ /*����������*/
+ struct queue_item* first_item = TAILQ_FIRST(&mdi->query_wait_lq);
+ frag_in_t* frg = NULL;
+ while(first_item != NULL)
+ {
+ struct queue_item *item = TAILQ_NEXT(first_item, entries);
+ TAILQ_REMOVE(&mdi->query_wait_lq, first_item, entries);
+ frg = (frag_in_t*)first_item->node;
+ atomic_inc(&g_frag_stat.sysinfo_stat[MULTISRC_QUEUE][QUEUE_OUT]);
+ frag_write_to_log(ADD_FRAG_FROM_TAILQ, frg->mid, frg, NULL, 0);
+ free_frag_in(frg,0,NULL);
+ free(first_item);
+ first_item = NULL;
+ first_item = item;
+ }
+
+ for(int i=0;i<INFO_MEDIA_NUM;i++)
+ {
+ if(NULL!=mdi->media_info[i])
+ {
+ free_text((&mdi->media_info[i]), 1);
+ }
+ }
+ IVI_destroy(mdi->ivi, NULL, NULL);
+ free(mdi);
+ }
+}
+
+void set_sip_query_task(frag_unit_t* frg_unit, media_t* mdi)
+{
+ /*voip SIP����*/
+ frag_write_to_log(SEND_VOIP_QUERY_1, frg_unit->pid, frg_unit, NULL, 0);
+ atomic_inc(&frag_rssb.stat_info[RSSB_INDEX_ONCE_QUERY_SEND]);
+ if(sip_index_query(frg_unit, frg_unit->thread_seq))
+ {
+ frag_write_to_log(RECV_VOIP_ACK_1, frg_unit->pid, frg_unit, NULL, 0);
+ atomic_inc(&frag_rssb.stat_info[RSSB_INDEX_ONCE_QUERY_RECV]);
+ }
+ else
+ {
+ frag_write_to_log(VOIP_QUERY_FAIL_1, frg_unit->pid, frg_unit, NULL, 0);
+ }
+ proc_sip_opt(frg_unit, mdi);
+
+ if(mdi->proto==AV_PROTOCOL_SIP)
+ {
+ if(mdi->re_offset==0)
+ {
+ /*���ö�ʱ����:������URL��ʱ����Ҫ���еڶ��ε���������*/
+ struct timer_context_t *context = (struct timer_context_t *)calloc(1, sizeof(struct timer_context_t));
+ context->mid = mdi->mid;
+ MESA_timer_add(g_frag_run.index_query_timer[mdi->thread_seq],
+ time(NULL),
+ g_frag_cfg.index_query_timeout,
+ index_query_timeout,
+ context,
+ index_query_timeout_free,
+ &mdi->index_query_timer_idx);
+ }
+ }
+}
+
+void proc_media_opt(frag_unit_t* frg_unit, media_t* mdi)
+{
+ /*addr for app*/
+ if(g_frag_cfg.app_switch && NULL!=frg_unit->opt[MEDIA_OPT_ADDR])
+ {
+ mdi->addrlist = merge_addr(mdi->addrlist, &mdi->addrlist_len, frg_unit->opt[MEDIA_OPT_ADDR]->opt_value, frg_unit->opt[MEDIA_OPT_ADDR]->opt_len, frg_unit->thread_seq);
+ }
+}
+void save_media_opt(frag_unit_t* frg_unit, media_t* mdi)
+{
+ /*���ÿһ��ѡ��*/
+ for(int i =0;i<MEDIA_OPT_MAXNUN;i++)
+ {
+ if(mdi->opt_index<OPT_MAXNUN && NULL!=frg_unit->opt[i])
+ {
+ mdi->opt[i][mdi->opt_index] = frg_unit->opt[i];
+ frg_unit->opt[i] = NULL;
+ }
+ if(NULL==mdi->opt[MEDIA_OPT_URL][mdi->url_opt_index] && NULL!=mdi->opt[MEDIA_OPT_URL][i])
+ {
+ mdi->url_opt_index = i;
+ }
+ }
+ if(mdi->opt_index<OPT_MAXNUN)
+ {
+ mdi->opt_index++;
+ }
+
+ /*���ö��β�ѯ�Ķ�ʱ����*/
+ if(g_frag_cfg.av_dedup_switch && MEDIA_SERVICE_TYPE_AV==mdi->media_service_type)
+ {
+ if(NULL!=mdi->opt[MEDIA_OPT_URL][mdi->url_opt_index])
+ {
+ /*����URL���ж�Դ��ѯ*/
+ if(!FLAG_TEST(mdi->td_query,TD_QUERY_TYPE_YES))
+ {
+ FLAG_SET(mdi->td_query,TD_QUERY_TYPE_MULTISRC);
+ }
+ }
+
+ /*����URLҲ�����˲�ѯ��������Ϊ��ͳ��*/
+ if(NULL==mdi->index_query_timer_idx)
+ {
+ /*���ö�ʱ����:������URL��ʱ����Ҫ���еڶ��ε���������*/
+ struct timer_context_t *context = (struct timer_context_t *)calloc(1, sizeof(struct timer_context_t));
+ context->mid = mdi->mid;
+ MESA_timer_add(g_frag_run.index_query_timer[mdi->thread_seq],
+ time(NULL),
+ g_frag_cfg.index_query_timeout,
+ index_query_timeout,
+ context,
+ index_query_timeout_free,
+ &mdi->index_query_timer_idx);
+ }
+ }
+}
+
+/*ʶ���Ŀ������ ��ͨ����Ƶ ��Ƭ��Ŀ VOIP*/
+void set_media_service_type(media_t* mdi)
+{
+ if(FILE_OSMF==mdi->media_type||FILE_HLS==mdi->media_type)
+ {
+ mdi->media_service_type = MEDIA_SERVICE_TYPE_FRAG;
+ mdi->media_len = 0;
+ }
+ else if(FILE_FRAG==mdi->media_type)
+ {
+ mdi->media_service_type = MEDIA_SERVICE_TYPE_FRAG;
+ }
+ else if(AV_PROTOCOL_SIP==mdi->proto)
+ {
+ mdi->media_service_type = MEDIA_SERVICE_TYPE_SIP;
+ }
+}
+
+/*proc_unit : frag_unit*/
+long media_create_cb(void *data, const uint8_t *key, uint size, void *user_arg)
+{
+ media_t* mdi = (media_t*)data;
+ frag_unit_t* frg_unit = (frag_unit_t*)user_arg;
+ int in_set = 0; // record cap_ip/PID , update, keep the newest capIP
+ char pbuf[32] = {0};
+ int buf_len = 32;
+ int media_create_flag = 0;
+
+ if(NULL==mdi)
+ {
+ mdi = (media_t*)calloc(1,sizeof(media_t));
+ if(0>(MESA_htable_add(g_frag_run.media_hash,key, size,(const void*)mdi)))
+ {
+ MESA_handle_runtime_log(g_frag_run.logger, RLOG_LV_FATAL, FRAG_REASSEMBLY_MODULE_NAME,
+ "{%s:%d} create_media_cb MESA_htable_add error: [mid: %llu]",
+ __FILE__,__LINE__, *(uint64_t*)key);
+ free_media(mdi);
+ return -1;
+ }
+ media_create_flag = 1;
+
+ /*set mediainfo*/
+ mdi->mid = frg_unit->mid;
+ mdi->mid_after_multisrc = mdi->mid;
+ mdi->media_len = frg_unit->media_len;
+ /*media type*/
+ mdi->media_type = frg_unit->media_type;
+ mdi->proto = frg_unit->proto;
+ mdi->hit_service = frg_unit->hitservice;
+ mdi->data_flag = frg_unit->data_flag;
+ mdi->meta_flag = frg_unit->flag;
+ mdi->thread_seq = frg_unit->thread_seq;
+
+ /*ȷ����Ŀ�Ƿ�����Ƭ����VOIP*/
+ set_media_service_type(mdi);
+
+ /*����ѡ��*/
+ proc_media_opt(frg_unit, mdi);
+
+ /*����sip��ѯ����*/
+ if(mdi->media_service_type==MEDIA_SERVICE_TYPE_SIP)
+ {
+ set_sip_query_task(frg_unit, mdi);
+ }
+
+ /*����ý�����ͼ��*/
+ if(0!=mdi->media_type)
+ {
+ set_special_media_service(mdi->media_type, mdi);
+ }
+
+ /*app*/
+ if(g_frag_cfg.app_switch)
+ {
+ mdi->app_frg_lq = MESA_lqueue_create(1, 0);
+ }
+
+ mdi->create_time = time(NULL);
+ mdi->renew_time = mdi->create_time + renew_time_accord_medialen(mdi->media_len);
+
+ for(int i=0;i<KEEP_REOFFSET_MAXNUM;i++)
+ {
+ mdi->repeat_reoffset[i] = -1;
+ }
+ /*ivi*/
+ mdi->ivi = IVI_create();
+ mdi->save_ivi = IVI_create();
+
+ /*��Դ��ѯ�������*/
+ TAILQ_INIT(&mdi->query_wait_lq);
+
+ /*fuzzy*/
+ if(g_frag_cfg.fuzzy_digest_switch)
+ {
+ mdi->fuzzy = SFH_instance(mdi->media_len);
+ }
+ create_media_write_to_log(mdi, MEDIA_NEW, NULL);
+ atomic_inc(&g_frag_stat.media_stat[LOG_MEDIA_CREATE]);
+ media_stat(mdi, frg_unit);
+ }
+ else
+ {
+ /*��������Ƶ��Ŀһ��ʱ������������ּ�⣬renew��Ŀ��*/
+ if(!g_frag_cfg.app_switch && frg_unit->ab_offset==0
+ && ((MEDIA_SERVICE_TYPE_AV==mdi->media_service_type && mdi->renew_time < time(NULL))||mdi->hit_service==SERVICE_AUDIO_LANG))
+ {
+ renew_media(mdi);
+ media_create_flag = 1;
+ }
+ }
+ save_media_opt(frg_unit,mdi);
+ /*av_dedup*/
+ if(g_frag_cfg.av_dedup_switch
+ && mdi->td_complete==0
+ && g_frag_cfg.td_data_maxsize==0
+ && MEDIA_SERVICE_TYPE_AV==mdi->media_service_type
+ && NULL!=mdi->opt[MEDIA_OPT_URL][mdi->url_opt_index])
+ {
+ generate_td_meta(mdi);
+ FLAG_SET(mdi->td_query,TD_QUERY_TYPE_DEDUP);
+ caculate_md5(mdi->opt[MEDIA_OPT_TD_META][mdi->url_opt_index]->opt_value, mdi->opt[MEDIA_OPT_TD_META][mdi->url_opt_index]->opt_len, NULL, 0, mdi->td, TD_LEN);
+ mdi->td_complete = 1;
+ }
+
+ /*��Ƭȥ��*/
+ if(-1==media_frag_removal(mdi, frg_unit))
+ {
+ return -1;
+ }
+ frag_write_to_log(SET_OFFSET, *(uint64_t*)key, frg_unit, NULL, 0);
+
+ /*PID�洢*/
+ mdi->frag_unit_cnt++;
+ if(NULL==mdi->pid)
+ {
+ mdi->pid = (uint64_t*)calloc(1, PID_MAXNUM*sizeof(uint64_t));
+ }
+ for(int i=0; i<PID_MAXNUM; i++)
+ {
+ if(frg_unit->pid== mdi->pid[i])
+ {
+ in_set = 1;
+ break;
+ }
+ }
+ if(!in_set)
+ {
+ mdi->pid[mdi->pid_idx_last] = frg_unit->pid;
+ mdi->pid_idx_last = (mdi->pid_idx_last+1)%PID_MAXNUM;
+ }
+
+ /*CAP_IP �ش����ݵ�ǰ�˻���IP*/
+ if(frg_unit->qd_info_from_cpz_idx_last==0)
+ {
+ if(save_qd_info(mdi->qd_info, QD_MAXNUM, &mdi->qdinfo_idx_last,frg_unit->capIP, frg_unit->mid))
+ {
+ inet_ntop(AF_INET, &frg_unit->capIP, pbuf, buf_len);
+ create_media_write_to_log(mdi, MEDIA_FROM_CAPIP, pbuf);
+ }
+ }
+ /*��Դ�����£�������ƿװ��ǰ����Ϣ�����ڻؽ���*/
+ for(int i=0;i<frg_unit->qd_info_from_cpz_idx_last;i++)
+ {
+ inet_ntop(AF_INET, &frg_unit->capIP, pbuf, buf_len);
+ create_media_write_to_log(mdi, MEDIA_FROM_CPZIP, pbuf);
+ save_qd_info(mdi->qd_info_from_cpz, QD_MAXNUM, &mdi->qdinfo_from_cpz_idx_last, frg_unit->qd_info_from_cpz[i].cap_ip, frg_unit->qd_info_from_cpz[i].mid);
+ }
+
+ /*monitor because of ��Ƶ����ȫ��Ŀ���*/
+ /*����Ƶ���ּ��������������ƴװ����������־���е���Ƶԭʼ�ļ���¼*/
+ audio_lang_monitor_service(frg_unit->src_ip, mdi);
+ /*send ��⽨��*/
+#if K_PROJECT
+ if(frg_unit->hitservice >= 0x180 && frg_unit->hitservice < 0x200)
+ {
+ mdi->hit_service = frg_unit->hitservice;
+ config_monitor_service(frg_unit->src_ip, mdi);
+ }
+#else
+ if(frg_unit->hitservice > 0x80)
+ {
+ mdi->hit_service = frg_unit->hitservice;
+ config_monitor_service(frg_unit->src_ip, mdi);
+ }
+#endif
+
+ /*����ȫ��⣬���Dz���ǰ�˷�����*/
+ if(g_frag_cfg.all_hit_monitor_switch==ALL_HIT_MONITOR_FULLFILE && MEDIA_SERVICE_TYPE_AV==mdi->media_service_type)
+ {
+ FLAG_SET(mdi->flag, PROG_FLAG_DUMP);
+ }
+
+ /*media_create json*/
+ if(g_frag_cfg.media_json_switch && media_create_flag)
+ {
+ /*survey_json_report*/
+ media_json_report(mdi, TOPIC_EVENT_CREATE);
+ }
+ return 0;
+}
+
+int media_create(frag_unit_t* frg_unit)
+{
+ long rec_cb = 0;
+
+ /*��鶨ʱ��*/
+ if(g_frag_cfg.av_dedup_switch && g_frag_run.multisrc_timer[frg_unit->thread_seq])
+ {
+ MESA_timer_check(g_frag_run.multisrc_timer[frg_unit->thread_seq], time(NULL), g_frag_cfg.multisrc_timer_cb_maxtime);
+ }
+ /*��鶨ʱ��*/
+ if(g_frag_run.index_query_timer[frg_unit->thread_seq])
+ {
+ MESA_timer_check(g_frag_run.index_query_timer[frg_unit->thread_seq], time(NULL), g_frag_cfg.index_query_timer_cb_maxtime);
+ }
+
+ if(0!=frg_unit->mid)
+ {
+ MESA_htable_search_cb(g_frag_run.media_hash,(const uint8_t *)&frg_unit->mid,sizeof(frg_unit->mid),
+ media_create_cb,(void*)frg_unit,&rec_cb);
+ /*av record*/
+ av_record(frg_unit);
+ }
+ return (int)rec_cb;
+}
+
+void free_frag_unit(void* data)
+{
+ frag_unit_t* frg_unit = (frag_unit_t*)data;
+ if(NULL!=frg_unit)
+ {
+ atomic_inc(&frag_rssb.stat_info[RSSB_FREE_FRAG_UNIT]);
+ switch(frg_unit->media_type)
+ {
+ case FILE_HLS:
+ case FILE_OSMF:
+ case FILE_FRAG:
+ case FILE_MAYBE_FRAG:
+ frag_redis_index_twice(frg_unit);
+ break;
+
+ default:
+ break;
+ }
+
+ if(NULL!=frg_unit->text)
+ {
+ free_text(&frg_unit->text, frg_unit->text_num);
+ }
+ if(NULL!=frg_unit->frg_info)
+ {
+ for(int i=0;i<FRAG_UNIT_INFO_NUM;i++)
+ {
+ free_text(&frg_unit->frg_info[i], 1);
+ }
+ frg_unit->frg_info = NULL;
+ }
+ for(int i=0;i<MEDIA_OPT_MAXNUN;i++)
+ {
+ if(NULL!=frg_unit->opt[i])
+ {
+ free_opt(&frg_unit->opt[i]);
+ }
+ }
+
+ if(frg_unit->proto==AV_PROTOCOL_SIP)
+ {
+ free_frag_unit_sip_opt(frg_unit);
+ }
+ free(frg_unit);
+ frg_unit = NULL;
+ }
+}
+
+void init_frag_unit(frag_unit_t* frg_unit, uchar protocol)
+{
+ frg_unit->frg_info = (text_t**)calloc(1, FRAG_UNIT_INFO_NUM*sizeof(text_t*));
+}
+
+int add_media_info(msg_metainfo_t* minfo, char* opt, uint32_t src_ip, int thread_seq)
+{
+ long rec_cb = 0;
+ rssb_media_info_t media_info;
+ frag_unit_t* frg_unit = NULL;
+
+ media_info.pid = *(uint64_t*)minfo->prog_id;
+ media_info.media_type = minfo->media_type;
+ media_info.media_len = minfo->prog_len;
+ media_info.protocol = minfo->protocol;
+ media_info.hitservice = minfo->hitservice;
+ media_info.data_flag = minfo->data_flag;
+ media_info.flag = minfo->flag;
+ media_info.opt_num = minfo->opt_num;
+ media_info.opt = opt;
+ media_info.cap_IP = minfo->cap_IP;
+ media_info.src_ip = src_ip;
+ media_info.thread_seq = thread_seq;
+
+ /*����Ƶ��Ƭ��*/
+ if(is_frag(minfo->media_type))
+ {
+ if(minfo->media_type==FILE_MAYBE_FRAG)
+ {
+ return 0;
+ }
+ MESA_htable_search_cb(frag_rssb.converge_hash, (const uint8_t *)&media_info.pid, sizeof(media_info.pid),
+ converge_mediainfo_search_cb, (void*)&media_info, &rec_cb);
+ }
+ /*VOIP��*/
+ else if(minfo->protocol==AV_PROTOCOL_SIP)
+ {
+ if(!(g_frag_cfg.voip_filter_switch && minfo->media_type==VOIP_UNKNOWN_MEDIA_TYPE))
+ {
+ frg_unit = (frag_unit_t*)calloc(1,sizeof(frag_unit_t));
+ set_sip_frag_unit(&media_info, frg_unit);
+ frg_unit->mid = frg_unit->pid;
+ media_create(frg_unit);
+ free_frag_unit(frg_unit);
+ frg_unit = NULL;
+ }
+ }
+ /*���߳�����*/
+ else
+ {
+ frg_unit = (frag_unit_t*)calloc(1,sizeof(frag_unit_t));
+ set_frag_unit(&media_info, frg_unit);
+ /*ǰ���ʼ�û��addr�����ػ�δ���*/
+ if(g_frag_cfg.av_dedup_switch && frg_unit->opt[MEDIA_OPT_ADDR]==NULL)
+ {
+ free_frag_unit(frg_unit);
+ return 0;
+ }
+ /*��������Դ��ѯ*/
+ if(g_frag_cfg.av_dedup_switch && NULL==frg_unit->opt[MEDIA_OPT_URL] && NULL!=frg_unit->opt[MEDIA_OPT_SINGLE_KEY])
+ {
+ frag_write_to_log(SEND_AV_QUERY_1, frg_unit->pid, frg_unit, NULL, 0);
+ atomic_inc(&frag_rssb.stat_info[RSSB_AV_ONCE_QUERY_SEND]);
+ if(av_query(frg_unit))
+ {
+ frag_write_to_log(RECV_AV_ACK_1, frg_unit->pid, frg_unit, NULL, 0);
+ atomic_inc(&frag_rssb.stat_info[RSSB_AV_ONCE_QUERY_RECV]);
+ }
+ else
+ {
+ frag_write_to_log(AV_QUERY_FAIL_1, frg_unit->pid, frg_unit, NULL, 0);
+ }
+ }
+ frg_unit->mid = frg_unit->pid;
+ media_create(frg_unit);
+ free_frag_unit(frg_unit);
+ frg_unit = NULL;
+ }
+ return rec_cb;
+}
+
+void trace_store_file(uint64_t mid, uint8_t media_type, uint8_t proto, uint16_t reoffset, uint64_t aboffset, char* data, uint32_t data_len)
+{
+ FILE* pFile;
+ char filename[MAX_PATH_LEN] = {0};
+ char data_path[MAX_PATH_LEN] = {0};
+ struct tm now;
+ char day_time[32] = {0};
+ char* suffix = NULL;
+ struct timeval tv;
+ struct timezone tz;
+ gettimeofday(&tv, &tz);
+ localtime_r(&tv.tv_sec, &now);
+ strftime(day_time, sizeof(day_time), "%Y%m%d", &now);
+
+ if((g_frag_cfg.cpz_type == CPZ_VOIP)&&(media_type == AUDIO_VIVOX))
+ {
+ return;
+ }
+
+ /*����media_type�����ļ���׺*/
+ suffix = gen_filesuffix_by_mediatype(suffix, media_type, proto);
+
+ mkdir_r(g_frag_cfg.store_filepath);
+ snprintf(data_path, sizeof(data_path), "%s/%s/", g_frag_cfg.store_filepath, day_time);
+ mkdir_r(data_path);
+ snprintf(filename, sizeof(filename), "%s/%s/%lu_%hu.%s", g_frag_cfg.store_filepath, day_time, mid, reoffset, suffix);
+ pFile = fopen(filename,"r+");
+ if(NULL==pFile)
+ {
+ pFile = fopen(filename,"w");
+ fclose(pFile);
+ pFile = fopen(filename,"r+");
+ }
+ if(NULL!=pFile)
+ {
+ fseek(pFile,aboffset,SEEK_SET);
+ fwrite(data, data_len, 1, pFile);
+ fflush(pFile);
+ fclose(pFile);
+ }
+}
+
+void tag_frag_in_media(media_t* mdi, frag_in_t* frg)
+{
+ mdi->pkt_proc++;
+ mdi->byte_proc += frg->datalen;
+ mdi->lastpkt_time = time(NULL);
+
+ frg->new_mid = mdi->mid_after_multisrc;
+ //frg->create_time = mdi->create_time;
+ //frg->media_type = mdi->media_type;
+ //frg->proto = mdi->proto;
+
+ if(NULL!=mdi->fuzzy)
+ {
+ mdi->fuzzy_acc_len += SFH_feed(mdi->fuzzy, frg->data, frg->datalen, frg->offset_in);
+ }
+ if(g_frag_cfg.av_dedup_switch && !FLAG_TEST(mdi->td_query,TD_QUERY_TYPE_YES) && MEDIA_SERVICE_TYPE_AV==mdi->media_service_type)
+ {
+ if(set_td_data(mdi, frg))
+ {
+ FLAG_SET(mdi->td_query,TD_QUERY_TYPE_DEDUP);
+ }
+ }
+ /*�ڵ�һ�����ݰ�֮ǰ��Ҫ���ͽ�ĿԪ��Ϣ*/
+ if(!FLAG_TEST(mdi->flag, PROG_SEND_META))
+ {
+ FLAG_SET(frg->frag_flag, FRAG_FLAG_SEND_META);
+ FLAG_SET(mdi->flag, PROG_SEND_META);
+ }
+ /*�����Ƿ���Ҫ���͸���ϵͳ*/
+ if(FLAG_TEST(mdi->flag, PROG_FLAG_EXCP))
+ {
+ FLAG_SET(frg->frag_flag, FRAG_FLAG_WINS);
+ }
+ /*�����Ƿ��Դ�����͸�������ƿװ*/
+ if(FLAG_TEST(mdi->td_query, TD_QUERY_RES_MULTISRC))
+ {
+ FLAG_SET(frg->frag_flag, FRAG_FLAG_MULTISRC);
+ frg->multisrc_bizmanip = mdi->multisrc_bizmanip;
+ /*��Դ��mid�Ѿ����޸�*/
+ frg->new_mid = mdi->mid_after_multisrc;
+ }
+ if(g_frag_cfg.app_switch)
+ {
+ MESA_lqueue_join_tail(mdi->app_frg_lq, &frg, sizeof(frg));
+ atomic_inc(&frag_rssb.sysinfo_stat[RSSB_WAIT_QUEUE][QUEUE_IN]);
+ frag_write_to_log(ADD_FRAG_TO_APP_LQ, frg->mid, frg, NULL, 0);
+ }
+ /*trace debug*/
+ if(g_frag_cfg.store_filepath_switch)
+ {
+ trace_store_file(frg->mid, mdi->media_type, mdi->proto, frg->seq, frg->offset, frg->data, frg->datalen);
+ }
+}
+
+void copy_frg(frag_in_t* src, frag_in_t* dst)
+{
+ memcpy(src, dst, sizeof(frag_in_t));
+ src->data = (char*)malloc(dst->datalen);
+ memcpy(src->data, dst->data, dst->datalen);
+}
+
+int frag_add_tailq(media_t* mdi, frag_in_t* frg, int thread_seq)
+{
+ frag_in_t* frg_new = (frag_in_t*)malloc(sizeof(frag_in_t));
+ struct queue_item* item = NULL;
+
+ copy_frg(frg_new, frg);
+ item = (struct queue_item*)malloc(sizeof(struct queue_item));
+ item->node = (void*)frg_new;
+ item->thread_seq = thread_seq;
+ TAILQ_INSERT_TAIL(&mdi->query_wait_lq, item, entries);
+ atomic_inc(&g_frag_stat.sysinfo_stat[MULTISRC_QUEUE][QUEUE_IN]);
+ frag_write_to_log(ADD_FRAG_TO_TAILQ, frg_new->mid, frg_new, NULL, 0);
+ return 0;
+}
+
+/*���ض�Դ�ȴ�����,����ͬʱ����ش������Բ���free*/
+int frag_add_query_wait_lq(media_t* mdi, frag_ivi_info_t* frag_ivi_info, uint32_t frag_stat)
+{
+ frag_in_t* frg = frag_ivi_info->frg;
+
+ if(1==frag_stat)
+ {
+ frag_add_tailq(mdi, frg, frag_ivi_info->thread_seq);
+ }
+ else if(frag_stat==2)
+ {
+ frag_in_t* frg_new = NULL;
+ for(int i=0;i<frag_ivi_info->frg_array_cnt;i++)
+ {
+ frg_new = frag_ivi_info->frg_array[i];
+ frag_add_tailq(mdi, frg_new, frag_ivi_info->thread_seq);
+ }
+ }
+ return 0;
+}
+
+long media_preproc_cb(void *data, const uint8_t *key, uint size, void *user_arg)
+{
+ int frag_stat = 0;
+ media_t* mdi = (media_t*)data;
+ frag_ivi_info_t* frag_ivi_info = (frag_ivi_info_t*)user_arg;
+ frag_unit_t* frg_unit = frag_ivi_info->frg_unit;
+ frag_in_t* frg = frag_ivi_info->frg;
+
+ if(NULL==mdi)
+ {
+ return -1;
+ }
+
+ /*��Ŀ�յ������ݳ���ͳ��*/
+ mdi->pkt_in++;
+ mdi->byte_in += frg->datalen;
+
+ /*���м�����ã�������ȥ��֮ǰд�ļ�*/
+ if(g_frag_cfg.monitor_file_switch && FLAG_TEST(mdi->flag, PROG_FLAG_DUMP))
+ {
+ monitor_service_dump_file(frg, mdi);
+ }
+
+ /*�����Ŀ�ظ�������Ҫ����*/
+ if(FLAG_TEST(mdi->td_query, TD_QUERY_RES_DEDUP) && !g_frag_cfg.dedup_invalid)
+ {
+ frag_ivi_info->td_query = mdi->td_query;
+ return 1;
+ }
+
+ /*offset==0, log*/
+ if(0==frg->offset && MEDIA_SERVICE_TYPE_AV==mdi->media_service_type)
+ {
+ update_special_media_service(mdi, frg);
+ /*
+ if(!FLAG_TEST(mdi->flag, PROG_OFFSET_ZERO))
+ {
+ atomic_inc(&g_frag_stat.media_stat[LOG_MEDIA_OFFSET_ZERO]);
+ FLAG_SET(mdi->flag, PROG_OFFSET_ZERO);
+ }
+ create_media_write_to_log(mdi, MEDIA_OFFSET_ZERO, NULL);
+ */
+ }
+ /*record maxoffset*/
+ //mdi->maxoffset = MAX(frg->offset, mdi->maxoffset);
+
+ /*IVI ȥ��*/
+ if(NULL!=frg)
+ {
+ frag_stat = media_removal(mdi, frg_unit, frg, frag_ivi_info);
+ /*keep frag*/
+ if(frag_stat==1)
+ {
+ tag_frag_in_media(mdi, frg);
+ media_byte_stat(mdi, frg_unit, frg);
+ }
+ /*new frag*/
+ else if(frag_stat==2)
+ {
+ /*every frag set info same with frag*/
+ for(int i=0;i<frag_ivi_info->frg_array_cnt;i++)
+ {
+ frag_ivi_info->frg_array[i]->mid = frg->mid;
+ frag_ivi_info->frg_array[i]->pid = frg->pid;
+ frag_ivi_info->frg_array[i]->offset_in = frag_ivi_info->frg_array[i]->offset;
+ frag_ivi_info->frg_array[i]->seq = frg->seq;
+ frag_ivi_info->frg_array[i]->thread_seq = frg->thread_seq;
+ tag_frag_in_media(mdi, frag_ivi_info->frg_array[i]);
+ media_byte_stat(mdi, frg_unit, frag_ivi_info->frg_array[i]);
+ }
+ }
+ else
+ {
+ return 0;
+ }
+ }
+
+ /*offset==0, log*/
+ if(0==frg->offset)
+ {
+ if(!FLAG_TEST(mdi->flag, PROG_OFFSET_ZERO))
+ {
+ atomic_inc(&g_frag_stat.media_stat[LOG_MEDIA_OFFSET_ZERO]);
+ FLAG_SET(mdi->flag, PROG_OFFSET_ZERO);
+ }
+ create_media_write_to_log(mdi, MEDIA_OFFSET_ZERO, NULL);
+ }
+ /*record maxoffset*/
+ mdi->maxoffset = MAX(frg->offset, mdi->maxoffset);
+
+ /*���Թ���:�洢��Ч�ļ�dmj*/
+ if(g_frag_cfg.save_media)
+ {
+ if(g_frag_cfg.cpz_type == CPZ_VOIP)
+ {
+ uint32_t datalen = frg->datalen - sizeof(voip_header_t);
+ char* data = (char*)calloc(1,datalen);
+ memcpy(data,frg->data + sizeof(voip_header_t),datalen);
+ IVI_seg_t* a_ivi_seg = IVI_seg_malloc(frg->offset, frg->offset + datalen - 1, data);
+ IVI_insert(mdi->save_ivi,a_ivi_seg);
+ }
+ else
+ {
+ char* data = (char*)calloc(1,frg->datalen);
+ memcpy(data,frg->data,frg->datalen);
+ IVI_seg_t* a_ivi_seg = IVI_seg_malloc(frg->offset, frg->offset + frg->datalen-1, data);
+ IVI_insert(mdi->save_ivi,a_ivi_seg);
+ }
+ }
+
+ /*��Ŀ���ض�Դ��ѯ֮ǰ������׼��*/
+ if(g_frag_cfg.av_dedup_switch && MEDIA_SERVICE_TYPE_AV==mdi->media_service_type)
+ {
+ proc_media_multisrc(mdi, 0);
+ if(!FLAG_TEST(mdi->td_query, TD_QUERY_ACK_MULTISRC))
+ {
+ /*��Ϊ��Դ��Ҫ���ݻ��棬ͬʱ��������˷���*/
+ frag_add_query_wait_lq(mdi, frag_ivi_info, frag_stat);
+ }
+ frag_ivi_info->td_query = mdi->td_query;
+ }
+ return (long)frag_stat;
+}
+
+int media_preproc(frag_ivi_info_t* frag_ivi_info)
+{
+ long rec_cb = -1;
+ frag_in_t* frg = frag_ivi_info->frg;
+
+ /*���ԣ�ʹ�ü�鶨ʱ����ʱ��̭*/
+ /*
+ if(g_frag_cfg.av_dedup_switch && g_frag_run.multisrc_timer[frag_ivi_info->thread_seq])
+ {
+ MESA_timer_check(g_frag_run.multisrc_timer[frag_ivi_info->thread_seq], time(NULL), g_frag_cfg.multisrc_timer_cb_maxtime);
+ }
+ */
+
+ if(0!=frg->mid)
+ {
+ MESA_htable_search_cb(g_frag_run.media_hash,(const uint8_t *)&frg->mid,sizeof(frg->mid),
+ media_preproc_cb,(void*)frag_ivi_info,&rec_cb);
+ }
+ return (int)rec_cb;
+}
+
+/*���������У����Զ�η���*/
+ void frag_try_add_wait_lq(uint8_t td_query, frag_in_t* frg, int thread_seq)
+ {
+ int lq_rec = 0;
+ //int try_num = 0;
+
+ atomic_inc(&g_frag_stat.stat_info[TO_SEND][TOTAL_PKTS]);
+ atomic_add(&g_frag_stat.stat_info[TO_SEND][TOTAL_BYTES], frg->datalen);
+ if(FLAG_TEST(td_query, TD_QUERY_RES_DEDUP))
+ {
+ atomic_inc(&g_frag_stat.stat_info[DEDUP_DROP][TOTAL_PKTS]);
+ atomic_add(&g_frag_stat.stat_info[DEDUP_DROP][TOTAL_BYTES], frg->datalen);
+ }
+ if(!FLAG_TEST(td_query, TD_QUERY_RES_DEDUP) || g_frag_cfg.dedup_invalid)
+ {
+ lq_rec = MESA_lqueue_join_tail(frag_rssb.wait_lq[thread_seq], &frg, sizeof(frg));
+ if(lq_rec==MESA_QUEUE_RET_OK)
+ {
+ frag_write_to_log(ADD_FRAG_TO_WAIT_LQ, frg->mid, frg, NULL, 0);
+ atomic_inc(&frag_rssb.sysinfo_stat[RSSB_WAIT_QUEUE][QUEUE_IN]);
+ }
+ else
+ {
+ atomic_inc(&g_frag_stat.stat_info[WAIT_QUEUE_FULL_DROP][TOTAL_PKTS]);
+ atomic_add(&g_frag_stat.stat_info[WAIT_QUEUE_FULL_DROP][TOTAL_BYTES], frg->datalen);
+ free_frag_in(frg,0,NULL);
+ }
+ }
+ else
+ {
+ free_frag_in(frg,0,NULL);
+ }
+ }
+
+/*���Ͷ���*/
+int frag_add_wait_lq(frag_ivi_info_t* frag_ivi_info, uint32_t frag_stat, int thread_seq)
+{
+ frag_in_t* frg = frag_ivi_info->frg;
+
+ /*rec=1: IVI ȥ��ֻ��һ��frag rec=2:IVIȥ��ֻ������frag*/
+ if(1==frag_stat)
+ {
+ frag_try_add_wait_lq(frag_ivi_info->td_query, frg, thread_seq);
+ }
+ else if(frag_stat==2)
+ {
+ frag_in_t* frg_new = NULL;
+ for(int i=0;i<frag_ivi_info->frg_array_cnt;i++)
+ {
+ frg_new = frag_ivi_info->frg_array[i];
+ frag_try_add_wait_lq(frag_ivi_info->td_query, frg_new, thread_seq);
+ frag_ivi_info->frg_array[i] = NULL;
+ }
+ free_frag_in(frg,0,NULL);
+ }
+ else
+ {
+ free_frag_in(frg,0,NULL);
+ }
+ return 0;
+}
+
+/*����ֵ��-1��*/
+int frag_service(frag_ivi_info_t* frag_ivi_info, uint32_t src_ip, int thread_seq)
+{
+ int frag_stat = 0;
+
+ frag_ivi_info->thread_seq = thread_seq;
+ frag_ivi_info->mid = frag_ivi_info->frg->mid;
+
+ /*��Ƭ���������ظ����ݲ��ٴ���*/
+ if(frag_ivi_info->frg_unit!=NULL && frag_ivi_info->frg_unit->repeat_not_proc)
+ {
+ atomic_inc(&g_frag_stat.stat_info[FRAG_DEDUP][TOTAL_PKTS]);
+ atomic_add(&g_frag_stat.stat_info[FRAG_DEDUP][TOTAL_BYTES],frag_ivi_info->frg->datalen);
+ return -1;
+ }
+
+ /*create media, because no cnvg and no index*/
+ frag_stat = media_preproc(frag_ivi_info);
+ if(-1==frag_stat)
+ {
+ /*û���ҵ���Ŀ��������û��Ԫ��Ϣ*/
+ frag_write_to_log(MEDIA_NO_META, frag_ivi_info->frg->pid, NULL, NULL, 0);
+ atomic_inc(&g_frag_stat.stat_info[MEDIA_NOMETA][TOTAL_PKTS]);
+ atomic_add(&g_frag_stat.stat_info[MEDIA_NOMETA][TOTAL_BYTES],frag_ivi_info->frg->datalen);
+ return -1;
+ }
+ /*��Ƭ����������:����У�����*/
+ if(!g_frag_cfg.app_switch)
+ {
+ frag_add_wait_lq(frag_ivi_info, frag_stat, thread_seq);
+ }
+ return 0;
+}
+
+long converge_data_search_cb(void *data, const uint8_t *key, uint size, void *user_arg)
+{
+ frag_ivi_info_t* frag_ivi_info = (frag_ivi_info_t*)user_arg;
+ frag_unit_t* frg_unit = (frag_unit_t*)data;
+ frag_in_t* frg = frag_ivi_info->frg;
+
+ if(NULL==frg_unit)
+ {
+ return -1;
+ }
+
+ frag_ivi_info->frg_unit = frg_unit;
+
+ /*��Ƭ���������������IJ��ٴ����������ﶪ�������ٲ��Ŀ�Ĵ���*/
+ if(frg_unit->repeat_not_proc)
+ {
+ atomic_inc(&g_frag_stat.stat_info[FRAG_DEDUP][TOTAL_PKTS]);
+ atomic_add(&g_frag_stat.stat_info[FRAG_DEDUP][TOTAL_BYTES],frg->datalen);
+ free_frag_in(frg,0,NULL);
+ return 0;
+ }
+
+ /*write log*/
+ frag_write_to_log(ADD_FRAG, frg->pid, frg, NULL, 0);
+
+ if(frg_unit->frag_state==STAT_OK)
+ {
+ frg->mid = frg_unit->mid;
+ return 1;
+ }
+
+ if(frg_unit->frag_state==STAT_CNVG_QUERY)
+ {
+ /*join frag to cnvg_lq*/
+ MESA_lqueue_join_tail(frg_unit->frg_cnvg_lq, &frg, sizeof(frg));
+ /*write log*/
+ frag_write_to_log(ADD_FRAG_TO_CNVG_LQ, frg->mid, frg, NULL, 0);
+ atomic_inc(&frag_rssb.sysinfo_stat[RSSB_CNVG_QUEUE][QUEUE_IN]);
+ }
+ else if(frg_unit->frag_state==STAT_INDEX_QUERY)
+ {
+ MESA_lqueue_join_tail(frg_unit->frg_index_lq, &frg, sizeof(frg));
+ frag_write_to_log(ADD_FRAG_TO_INDEX_LQ, frg->mid, frg, NULL, 0);
+ atomic_inc(&frag_rssb.sysinfo_stat[RSSB_INDEX_QUEUE][QUEUE_IN]);
+ }
+ return 0;
+}
+
+int add_frag(uint64_t pid, uint64_t offset, char* data, uint32_t datalen, uint8_t protocol, uint32_t src_ip, int thread_seq)
+{
+ frag_ivi_info_t* frag_ivi_info = (frag_ivi_info_t*)calloc(1, sizeof(frag_ivi_info_t));
+ frag_in_t* frg = NULL;
+ long rec = 0;
+
+ frg = (frag_in_t*)calloc(1, sizeof(frag_in_t));
+ frg->mid = pid;
+ frg->pid = pid;
+ frg->offset = offset;
+ frg->datalen = datalen;
+ frg->data = (char*)malloc(frg->datalen);
+ memcpy(frg->data, data, datalen);
+ frg->thread_seq = thread_seq;
+ frg->src_ip = src_ip;
+
+ /*������Ӧ��*/
+ send_ack_to_qd(frg, src_ip, thread_seq);
+
+ frag_ivi_info->frg = frg;
+ MESA_htable_search_cb(frag_rssb.converge_hash, (const uint8_t *)&pid, sizeof(pid),
+ converge_data_search_cb, (void*)frag_ivi_info, &rec);
+ /*��Ƭ����������׼������*/
+ if(1==rec)
+ {
+ rec=frag_service(frag_ivi_info, src_ip, thread_seq);
+ if(-1==rec)
+ {
+ free_frag_in(frg,0,NULL);
+ }
+ }
+ /*��ͳ��Ŀ*/
+ else if(-1==rec)
+ {
+ rec = frag_service(frag_ivi_info, src_ip, thread_seq);
+ /*û��Ԫ��Ϣ,�ͷ�*/
+ if(-1==rec)
+ {
+ free_frag_in(frg,0,NULL);
+ }
+ }
+ free(frag_ivi_info);
+ return rec;
+}
+
+/*return
+* -1:��Ŀ������
+* 0:������Ԫ��Ϣ
+* 1:����Ԫ��Ϣ
+*/
+long get_media(void *data, const uint8_t *key, uint size, void *user_arg)
+{
+ media_t* mdi = (media_t*)data;
+ media_info_t* media_info = (media_info_t*)user_arg;
+ int opt_num = 0;
+ if(NULL!=mdi)
+ {
+ media_info->td_query = mdi->td_query;
+ media_info->mdi_flag = mdi->flag;
+ /*media_len*//*mid,protocol,mediatype,file_name*/
+ media_info->prog_len = mdi->media_len;
+ media_info->mid = mdi->mid_after_multisrc;
+ media_info->protocol = mdi->proto;
+ media_info->data_flag = mdi->data_flag;
+ media_info->flag = mdi->meta_flag;
+ /*��Դ��������۴�ƴװ�ϲ�����ɢ����ʶ*/
+ if(FLAG_TEST(mdi->td_query, TD_QUERY_RES_MULTISRC))
+ {
+ media_info->hitservice = 0;
+ }
+ else
+ {
+ media_info->hitservice = mdi->hit_service;
+ }
+ /*����ͳ��for data*/
+ /*����HLS OSMF��������ƬЭ��media_type����Ϊ��ͳ��Ŀ��mediatype*/
+ if(mdi->media_type==FILE_FRAG)
+ {
+ media_info->media_type = FILE_AV;
+ media_info->prog_len = 0;
+ }
+ else
+ {
+ media_info->media_type = mdi->media_type;
+ }
+ if(g_frag_cfg.modify_capIP_switch)
+ {
+ media_info->cap_IP = g_frag_cfg.local_ip_nr;
+ }
+ else
+ {
+ media_info->cap_IP = mdi->qd_info[0].cap_ip;
+ }
+
+ /*��Դ������������ƴ��װ����Ҫ����ѡ��*/
+ if(FLAG_TEST(mdi->td_query, TD_QUERY_RES_MULTISRC))
+ {
+ /*������ƿװ��ѡ��*/
+ if(NULL!=mdi->opt[MEDIA_OPT_URL][mdi->url_opt_index])
+ {
+ media_info->opt_unit = (struct opt_unit_t*)calloc(2+mdi->qdinfo_idx_last, sizeof(struct opt_unit_t));
+ /*META_OPT_LAYER_ADDR*/
+ media_info->opt_unit[0].opt_len = sizeof(uint32_t)+sizeof(uint8_t)+mdi->opt[MEDIA_OPT_ADDR][mdi->url_opt_index]->opt_len;
+ media_info->opt_unit[0].opt_type = META_OPT_LAYER_ADDR;
+ media_info->opt_unit[0].opt_value = (char*)calloc(1, mdi->opt[MEDIA_OPT_ADDR][mdi->url_opt_index]->opt_len);
+ memcpy(media_info->opt_unit[0].opt_value, mdi->opt[MEDIA_OPT_ADDR][mdi->url_opt_index]->opt_value, mdi->opt[MEDIA_OPT_ADDR][mdi->url_opt_index]->opt_len);
+ opt_num++;
+
+ /*META_OPT_LAYER_URL*/
+ media_info->opt_unit[1].opt_len = sizeof(uint32_t)+sizeof(uint8_t)+mdi->opt[MEDIA_OPT_URL][mdi->url_opt_index]->opt_len;
+ media_info->opt_unit[1].opt_type = META_OPT_LAYER_URL;
+ media_info->opt_unit[1].opt_value = (char*)calloc(1, mdi->opt[MEDIA_OPT_URL][mdi->url_opt_index]->opt_len);
+ memcpy(media_info->opt_unit[1].opt_value, mdi->opt[MEDIA_OPT_URL][mdi->url_opt_index]->opt_value, mdi->opt[MEDIA_OPT_URL][mdi->url_opt_index]->opt_len);
+ opt_num++;
+ }
+ else
+ {
+ media_info->opt_unit = (struct opt_unit_t*)calloc(1+mdi->qdinfo_idx_last, sizeof(struct opt_unit_t));
+ /*META_OPT_LAYER_ADDR*/
+ media_info->opt_unit[0].opt_len = sizeof(uint32_t)+sizeof(uint8_t)+mdi->opt[MEDIA_OPT_ADDR][mdi->url_opt_index]->opt_len;
+ media_info->opt_unit[0].opt_type = META_OPT_LAYER_ADDR;
+ media_info->opt_unit[0].opt_value = (char*)calloc(1, mdi->opt[MEDIA_OPT_ADDR][mdi->url_opt_index]->opt_len);
+ memcpy(media_info->opt_unit[0].opt_value, mdi->opt[MEDIA_OPT_ADDR][mdi->url_opt_index]->opt_value, mdi->opt[MEDIA_OPT_ADDR][mdi->url_opt_index]->opt_len);
+ opt_num++;
+ }
+
+ /*META_OPT_MID_BEFORE_MULTISRC*/
+ for(int i=0;i<mdi->qdinfo_idx_last;i++)
+ {
+ media_info->opt_unit[2+i].opt_len = sizeof(uint32_t)+sizeof(uint8_t)+sizeof(uint64_t)+sizeof(uint32_t);
+ media_info->opt_unit[2+i].opt_type = META_OPT_INFO_BEFORE_MULTISRC;
+ media_info->opt_unit[2+i].opt_value = (char*)calloc(1, sizeof(uint64_t)+sizeof(uint32_t));
+ memcpy(media_info->opt_unit[2+i].opt_value, &mdi->qd_info[i].mid, sizeof(uint64_t));
+ memcpy(media_info->opt_unit[2+i].opt_value+sizeof(uint64_t), &mdi->qd_info[i].cap_ip, sizeof(uint32_t));
+ opt_num++;
+ }
+ }
+ else
+ {
+ /*SIP ѡ��*/
+ if(mdi->proto==AV_PROTOCOL_SIP && mdi->sip_rate_info!=NULL)
+ {
+ media_info->opt_unit = (struct opt_unit_t*)calloc(1, sizeof(struct opt_unit_t));
+ media_info->opt_unit->opt_len = sizeof(uint32_t)+sizeof(uint8_t)+mdi->sip_rate_info->opt_len;
+ media_info->opt_unit->opt_type = META_OPT_SIP_SEND_RATE_INFO;
+ media_info->opt_unit->opt_value = (char*)calloc(1, mdi->sip_rate_info->opt_len);
+ memcpy(media_info->opt_unit->opt_value, mdi->sip_rate_info->opt_value, mdi->sip_rate_info->opt_len);
+ opt_num++;
+ }
+#if K_PROJECT
+#else
+ if(mdi->proto!=AV_PROTOCOL_SIP)
+ {
+ /*�������ݷ�������ѡ��*/
+ media_info->opt_unit = (struct opt_unit_t*)calloc(1, sizeof(struct opt_unit_t));
+ media_info->opt_unit->opt_len = sizeof(uint32_t)+sizeof(uint8_t)+sizeof(char*);
+ media_info->opt_unit->opt_type = OPT_SOURCE_IP;
+ media_info->opt_unit->opt_value = (char*)calloc(1, sizeof(char*));
+ *(unsigned int*)(media_info->opt_unit->opt_value) = g_frag_cfg.local_ip_nr;
+ opt_num++;
+ }
+#endif
+ }
+ media_info->opt_num = opt_num;
+ return 1;
+ }
+ return 0;
+}
+
+void frag_reassembly_stats_init(const char* logdir, const char* filename)
+{
+ char log[MAX_PATH_LEN]={0};
+ int value = 0;
+ char conf_buf[MAX_PATH_LEN]={0};
+
+ memset(conf_buf, 0, sizeof(conf_buf));
+ frag_rssb.stat_handle = FS_create_handle();
+ MESA_load_profile_string_def(filename, "LOG", "StatFile", conf_buf, sizeof(conf_buf),"./log/frag_rssb/frag_reassembly_stat.log");
+ snprintf(log,sizeof(log),"%s/%s", logdir,conf_buf);
+ FS_set_para(frag_rssb.stat_handle, OUTPUT_DEVICE, log, strlen(log)+1);
+ value = 1;//flush by date
+ FS_set_para(frag_rssb.stat_handle, FLUSH_BY_DATE, &value, sizeof(value));
+ value = 2;//append
+ FS_set_para(frag_rssb.stat_handle, PRINT_MODE, &value, sizeof(value));
+ MESA_load_profile_short_def(filename, "LOG", "StatInterval", (short*)&frag_rssb.stat_interval,0);
+ FS_set_para(frag_rssb.stat_handle, STAT_CYCLE, &frag_rssb.stat_interval, sizeof(frag_rssb.stat_interval));
+ value = (frag_rssb.stat_interval!=0) ? 1 : 0;
+ FS_set_para(frag_rssb.stat_handle, PRINT_TRIGGER, &value, sizeof(value));
+ value = 0;
+ FS_set_para(frag_rssb.stat_handle, CREATE_THREAD, &value, sizeof(value));
+ FS_set_para(frag_rssb.stat_handle, APP_NAME, g_frag_stat.fs_app, strlen(g_frag_stat.fs_app)+1);
+ if(g_frag_stat.fs_remote_switch)
+ {
+ FS_set_para(frag_rssb.stat_handle, STATS_SERVER_IP, g_frag_stat.fs_ip, strlen(g_frag_stat.fs_ip)+1);
+ FS_set_para(frag_rssb.stat_handle, STATS_SERVER_PORT, &g_frag_stat.fs_port, sizeof(g_frag_stat.fs_port));
+ }
+
+ memset(conf_buf, 0, sizeof(conf_buf));
+ frag_rssb.sysinfo_handle = FS_create_handle();
+ MESA_load_profile_string_def(filename, "LOG", "SysinfoFile", conf_buf, sizeof(conf_buf),"./log/frag_rssb/frag_reassembly_sysinfo.log");
+ snprintf(log,sizeof(log),"%s/%s", logdir,conf_buf);
+ FS_set_para(frag_rssb.sysinfo_handle, OUTPUT_DEVICE, log, strlen(log)+1);
+ value = 1;//flush by date
+ FS_set_para(frag_rssb.sysinfo_handle, FLUSH_BY_DATE, &value, sizeof(value));
+ value = 2;//append
+ FS_set_para(frag_rssb.sysinfo_handle, PRINT_MODE, &value, sizeof(value));
+ MESA_load_profile_short_def(filename, "LOG", "SysinfoInterval", (short*)&frag_rssb.sysinfo_interval,0);
+ FS_set_para(frag_rssb.sysinfo_handle, STAT_CYCLE, &frag_rssb.sysinfo_interval, sizeof(frag_rssb.sysinfo_interval));
+ value = (frag_rssb.sysinfo_interval!=0) ? 1 : 0;
+ FS_set_para(frag_rssb.sysinfo_handle, PRINT_TRIGGER, &value, sizeof(value));
+ value = 0;
+ FS_set_para(frag_rssb.sysinfo_handle, CREATE_THREAD, &value, sizeof(value));
+
+}
+
+int read_frag_reassembly_conf(int thread_num, const char* logdir, const char* filename, int* maat_stat_swicth, int* maat_perf_swicth)
+{
+ int log_level = 0;
+ char log_name[MAX_PATH_LEN]={0};
+ char log[MAX_PATH_LEN]={0};
+
+ /*frag reassembly log*/
+ MESA_load_profile_short_def(filename, "LOG", "FragReassemblyLogLevel", (short*)&frag_rssb.logger_level,10);
+ MESA_load_profile_string_def(filename, "LOG", "FragReassemblyLogName", log_name , sizeof(log_name),"./log/frag_rssb/frag_reassembly.log");
+ snprintf(log,sizeof(log),"%s/%s", logdir,log_name);
+ frag_rssb.logger = MESA_create_runtime_log_handle(log,frag_rssb.logger_level);
+ if(NULL==frag_rssb.logger)
+ {
+ printf("read_frag_reassembly_conf: get FragReassemblyLog logger error.\n");
+ return -1;
+ }
+
+ /*maat stat log*/
+ MESA_load_profile_int_def(filename, "LOG", "MaatStatSwitch", maat_stat_swicth, 0);
+ MESA_load_profile_int_def(filename, "LOG", "MaatPerfSwitch", maat_perf_swicth, 0);
+
+ /*media log*/
+ memset(log, 0, sizeof(log));
+ memset(log_name, 0, sizeof(log_name));
+ MESA_load_profile_short_def(filename, "LOG", "MediaLogLevel", (short*)&log_level,30);
+ MESA_load_profile_string_def(filename, "LOG", "MediaLogName", log_name , sizeof(log_name),"./log/frag_rssb/frag_reassembly_media.log");
+ snprintf(log,sizeof(log),"%s/%s", logdir,log_name);
+ frag_rssb.media_logger = MESA_create_runtime_log_handle(log,log_level);
+ if(NULL==frag_rssb.media_logger)
+ {
+ printf("read_frag_reassembly_conf: get MediaLog media_logger error.\n");
+ return -1;
+ }
+
+ /*sysinfo log*/
+ frag_reassembly_stats_init(logdir, filename);
+
+ /*redis cluster init*/
+ int timeout = 0;
+ MESA_load_profile_int_def(filename, "NETWORK", "RedisClusterSwitch", &frag_rssb.redis_cluster_switch, 1);
+ MESA_load_profile_int_def(filename, "NETWORK", "RedisTimeout", &timeout, 5);
+
+ MESA_load_profile_string_nodef(filename, "NETWORK", "RedisBrokers", frag_rssb.redis_addr, sizeof(frag_rssb.redis_addr));
+ //MESA_load_profile_string_nodef(filename, "NETWORK", "RedisNet", frag_rssb.redis_cluster_net, sizeof(frag_rssb.redis_cluster_net));
+ //MESA_load_profile_int_def(filename, "NETWORK", "RedisUniqIPFlag", &frag_rssb.redis_cluster_netflag, 512);
+
+ MESA_load_profile_string_nodef(filename, "NETWORK", "RedisIP", frag_rssb.redis_ip, sizeof(frag_rssb.redis_ip));
+ MESA_load_profile_int_def(filename, "NETWORK", "RedisPort", &frag_rssb.redis_port, 6379);
+ frag_rssb.redis_tv.tv_sec = timeout;
+ frag_rssb.redis_tv.tv_usec = 0;
+
+ /*wait queue num*/
+ MESA_load_profile_uint_def(filename, "SYSTEM", "WaitQueueNum", &frag_rssb.wait_lq_num, 5000000);
+
+ /*init hash size*/
+ MESA_load_profile_uint_def(filename, "SYSTEM", "ConvergeHashThreadSafe", &frag_rssb.cnvg_hash_thread_safe, 64);
+ MESA_load_profile_uint_def(filename, "SYSTEM", "ConvergeHashSize", &frag_rssb.cnvg_hash_max_elem_num, 1024*1024);
+ MESA_load_profile_uint_def(filename, "SYSTEM", "ConvergeHashElemNum", &frag_rssb.cnvg_hash_size, 1024*1024*16);
+ MESA_load_profile_uint_def(filename, "SYSTEM", "ConvergeHashExpireTime", &frag_rssb.cnvg_hash_expire_time, 120);
+
+ return 0;
+}
+
+void frag_reassembly_release()
+{
+ int i=0;
+ if(NULL!=frag_rssb.sifter)
+ {
+ destroy_sifter(frag_rssb.sifter);
+ }
+
+ if(NULL!=frag_rssb.converge_hash)
+ {
+ MESA_htable_destroy(frag_rssb.converge_hash, free_frag_unit);
+ }
+
+ for(i=0;i<frag_rssb.lq_num;i++)
+ {
+ if(NULL!=frag_rssb.wait_lq[i])
+ {
+ MESA_lqueue_destroy(frag_rssb.wait_lq[i], free_frag_in, NULL);
+ }
+ }
+}
+
+int frag_reassembly_init(const char* frag_rssb_cfg_dir, const char* frag_rssb_log_dir, int thread_num)
+{
+ memset(&frag_rssb, 0, sizeof(frag_reassembly_t));
+ char config_buff[MAX_PATH_LEN] = {0};
+
+ /*frag_reassembly conf*/
+ int maat_stat_swicth = 0;
+ int maat_perf_swicth = 0;
+ snprintf(config_buff, sizeof(config_buff), "%s/%s", frag_rssb_cfg_dir, "frag_reassembly.conf");
+ if(-1==read_frag_reassembly_conf(thread_num, frag_rssb_log_dir, config_buff, &maat_stat_swicth, &maat_perf_swicth))
+ {
+ printf("read_frag_reassembly_conf error.\n");
+ frag_reassembly_release();
+ return -1;
+ }
+
+ /*tpl conf*/
+ snprintf(config_buff, sizeof(config_buff), "%s/%s", frag_rssb_cfg_dir, "sifter/");
+ frag_rssb.sifter = create_sifter(config_buff, frag_rssb_log_dir, maat_stat_swicth, maat_perf_swicth, thread_num, frag_rssb.logger);
+ if(NULL==frag_rssb.sifter)
+ {
+ printf("create_sifter error.\n");
+ MESA_handle_runtime_log(frag_rssb.logger, RLOG_LV_FATAL, FRAG_REASSEMBLY_MODULE_NAME,
+ "{%s:%d} create_sifter error", __FILE__,__LINE__);
+ frag_reassembly_release();
+ return -1;
+ }
+
+ /*converge_hash*/
+ MESA_htable_create_args_t hash_args;
+ memset(&hash_args,0,sizeof(MESA_htable_create_args_t));
+ hash_args.thread_safe = frag_rssb.cnvg_hash_thread_safe; //group lock
+ hash_args.recursive = 1;
+ hash_args.hash_slot_size = frag_rssb.cnvg_hash_size;
+ hash_args.max_elem_num = frag_rssb.cnvg_hash_max_elem_num;
+ hash_args.eliminate_type = HASH_ELIMINATE_ALGO_LRU;
+ hash_args.expire_time = frag_rssb.cnvg_hash_expire_time;
+ hash_args.key_comp = NULL;
+ hash_args.key2index = NULL;
+ hash_args.data_free = free_frag_unit;
+ hash_args.data_expire_with_condition = expire_cnvg_hash_node;
+ frag_rssb.converge_hash = MESA_htable_create(&hash_args, sizeof(hash_args));
+ if(NULL==frag_rssb.converge_hash)
+ {
+ frag_reassembly_release();
+ printf("create converge_hash error.\n");
+ MESA_handle_runtime_log(frag_rssb.logger, RLOG_LV_FATAL, FRAG_REASSEMBLY_MODULE_NAME,
+ "{%s:%d} create converge_hash error", __FILE__,__LINE__);
+ return -1;
+ }
+
+ /*wait_lq for get_frag*/
+ frag_rssb.lq_num = thread_num;
+ frag_rssb.wait_lq = (MESA_lqueue_head*)calloc(1, frag_rssb.lq_num*sizeof(MESA_lqueue_head));
+ for(int i=0;i<frag_rssb.lq_num;i++)
+ {
+ frag_rssb.wait_lq[i] = MESA_lqueue_create(1, frag_rssb.wait_lq_num);
+ }
+
+ /*stat log*/
+ if(0<frag_rssb.stat_interval)
+ {
+ if(-1 == create_pthread(thread_frag_rssb_stat_output,NULL,frag_rssb.logger))
+ {
+ MESA_handle_runtime_log(frag_rssb.logger, RLOG_LV_FATAL, FRAG_REASSEMBLY_MODULE_NAME,
+ (char*)"[%s:%d] thread thread_frag_rssb_stat_output create failed" ,__FILE__,__LINE__,FRAG_REASSEMBLY_MODULE_NAME);
+ return -1;
+ }
+ }
+
+ /*sysinfo log*/
+ if(0<frag_rssb.sysinfo_interval)
+ {
+ if(-1 == create_pthread(thread_frag_rssb_sysinfo_output,NULL,frag_rssb.logger))
+ {
+ MESA_handle_runtime_log(frag_rssb.logger, RLOG_LV_FATAL, FRAG_REASSEMBLY_MODULE_NAME,
+ (char*)"[%s:%d] thread thread_frag_rssb_sysinfo_output create failed" ,__FILE__,__LINE__,FRAG_REASSEMBLY_MODULE_NAME);
+ return -1;
+ }
+ }
+ return 0;
+}
+