#include #include #include "producer.h" static int update_valid_condition(int is_valid,int *valid_msg_num, int *invalid_msg_num, void * logger) { if(is_valid == 1) { (*valid_msg_num)++; } else if(is_valid == 0) { (*invalid_msg_num)++; } else { MESA_HANDLE_RUNTIME_LOG(logger,RLOG_LV_FATAL, "read_determinant_table_file", "valid %d", is_valid); return -1; } return 0; } void *producer_produce_work(void *p) { producer_t *producer = (producer_t*)p; if(producer->producer_source == LOCAL_FILE) producer_produce_msg_from_local_file(producer); /*else if(producer->producer_source == ORACLE_S) producer_produce_msg_from_oracle(producer); else if(producer->producer_source == MYSQL_S) producer_produce_msg_from_mysql(producer); else if(producer->producer_source == RESTFUL_S) producer_produce_msg_from_restful(producer); */ return NULL; } int producer_produce_msg_from_local_file(producer_t * producer) { int ret =0; int meta_type = UPDATE_NONE; int valid_msg_num = 0,invalid_msg_num = 0,error_msg_num = 0,total_msg_num = 0; common_module_t *common = producer->common; if(common->relibility_switch == TENSOR_REL_ON) { tensor_redis_state_check(common); } while (1) { char *p = NULL; char index_name_path[256]; //undo : persist processed file switch(common->pz_type) { case PZ_TYPE_DETERMINANT:// row/column type. meta_type = producer->get_determinant_index_path(index_name_path, sizeof(index_name_path)); break; case PZ_TYPE_NOT_DETERMINANT:// non row/column type. meta_type = producer->get_indeterminant_index_path(index_name_path, sizeof(index_name_path)); break; } p = strrchr(index_name_path, '.'); producer->producer_profile.version = atol(p+1); if(meta_type == UPDATE_NONE) { sleep (producer->producer_profile.read_file_interval); continue; } else { valid_msg_num = 0; invalid_msg_num = 0; error_msg_num = 0; total_msg_num = 0; MESA_HANDLE_RUNTIME_LOG (common->runtime_logger, RLOG_LV_INFO, "[READ_FILE]", "files :%s updates", index_name_path); //get valid and invalid nums for malloc maat_line_t to write redis,this is maat-redis's fault //we read index file twice to solve the problem of memory dissipation MESA_HANDLE_RUNTIME_LOG (common->runtime_logger, RLOG_LV_DEBUG, "[READ_FILE]", "get_valid_condition start"); ret = producer->analysis_index_get_valid_condition(&valid_msg_num, &invalid_msg_num,&error_msg_num,index_name_path); MESA_HANDLE_RUNTIME_LOG (common->runtime_logger, RLOG_LV_DEBUG, "[READ_FILE]", "get_valid_condition end"); if(ret < 0 ) { MESA_HANDLE_RUNTIME_LOG (common->error_logger, RLOG_LV_INFO, "[READ_FILE_GET_VALID_CONDITION]", "read file %s to get_valid_condition failed,skip this file.",index_name_path); MESA_HANDLE_RUNTIME_LOG (common->main_logger, RLOG_LV_INFO, "[READ_FILE_GET_VALID_CONDITION]", "business %s read file %s to get_valid_condition failed,please look at log/%s,skip this file." ,common->local_state_for_manager->business_name ,index_name_path ,common->local_state_for_manager->business_name); continue; } assert(ret >=0); MESA_HANDLE_RUNTIME_LOG (common->runtime_logger, RLOG_LV_DEBUG, "[READ_FILE_GET_VALID_CONDITION]", "analysis_index_get_valid_condition success"); FS_operate(common->stat_handle,common->fs_status_id[STATUS_VERSION],0,FS_OP_SET,producer->producer_profile.version); FS_operate(common->stat_handle,common->fs_status_id[STATUS_VALID_NUM],0,FS_OP_SET,valid_msg_num); FS_operate(common->stat_handle,common->fs_status_id[STATUS_INVALID_NUM],0,FS_OP_SET,invalid_msg_num); FS_operate(common->stat_handle,common->fs_status_id[STATUS_ERROR_NUM],0,FS_OP_SET,error_msg_num); total_msg_num = valid_msg_num+invalid_msg_num + error_msg_num; FS_operate(common->stat_handle,common->fs_status_id[STATUS_TOTAL_NUM],0,FS_OP_SET,total_msg_num); //analysize every table file to write redis MESA_HANDLE_RUNTIME_LOG (common->runtime_logger, RLOG_LV_DEBUG, "[READ_FILE]", "analysis_index_write_redis_fdfs start"); if(ret >= 0) ret = producer->analysis_index_write_redis_fdfs(valid_msg_num, invalid_msg_num,index_name_path); MESA_HANDLE_RUNTIME_LOG (common->runtime_logger, RLOG_LV_DEBUG, "[READ_FILE]", "analysis_index_write_redis_fdfs end"); if(ret < 0 ) { MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_FATAL, "[READ_FILE_WRITE_REDIS]", "read file %s to write redis error,this version not add to redis.", index_name_path); MESA_HANDLE_RUNTIME_LOG(common->main_logger,RLOG_LV_FATAL, "[READ_FILE_WRITE_REDIS]", "business %s read file %s to write redis error,please look at log/%s/runtimelog,skip this file." ,common->local_state_for_manager->business_name ,index_name_path ,common->local_state_for_manager->business_name); continue; } assert(ret >=0); MESA_HANDLE_RUNTIME_LOG (common->runtime_logger, RLOG_LV_INFO, "[READ_FILE_WRITE_REDIS]", "analysis_index_write_redis_fdfs success"); } //undo persisit the version of index //sscanf(,"%[a-zA-Z]_config_index.%u", filename, &producer_profile->version); MESA_HANDLE_RUNTIME_LOG (common->runtime_logger, RLOG_LV_INFO, "[READ_FILE]", "files :%s deal finish", index_name_path); //common->kvHandle->update_table_seq("full_config_index", producer->producer_profile.version); } return ret; } int producer_t::producer_init() { if (producer_load_profile() < 0) { return -1; } return 0; } int producer_t::producer_load_profile() { int ret = 0; int read_file_interval=0; char buf_dir[MAX_PATH_LENGTH] ; //int debug_flag = 0; ret = snprintf(buf_dir, MAX_PATH_LENGTH, "%s/%s", common->config_path, COMMON_CONFIG_NAME); if(ret == MAX_PATH_LENGTH) { MESA_HANDLE_RUNTIME_LOG(common->error_logger, RLOG_LV_FATAL, "[READ_PROFILE]", " dir exceed the max length!"); return -1; } ret = MESA_load_profile_int_def(buf_dir, "PRODUCER", "read_file_interval", &read_file_interval, 10); ret = MESA_load_profile_string_def(buf_dir, "PRODUCER", "cfg_full_path", producer_profile.cfg_full_path, MAX_PATH_LENGTH, NULL); ret = MESA_load_profile_string_def(buf_dir, "PRODUCER", "cfg_inc_path", producer_profile.cfg_inc_path, MAX_PATH_LENGTH, NULL); //producer->producer_profile.version = producer->common->kvHandle->get_table_seq("full_config_index"); producer_profile.version = 1; producer_profile.read_file_interval=read_file_interval; //undo producer_source = LOCAL_FILE; //printf MESA_HANDLE_RUNTIME_LOG (common->runtime_logger,RLOG_LV_INFO,"[READ_PROFILE]","read config file %s -----", buf_dir); MESA_HANDLE_RUNTIME_LOG (common->runtime_logger,RLOG_LV_INFO,"[READ_PROFILE]","----- cfg_full_path: %s\n", producer_profile.cfg_full_path); MESA_HANDLE_RUNTIME_LOG (common->runtime_logger,RLOG_LV_INFO,"[READ_PROFILE]","----- cfg_inc_path: %s\n", producer_profile.cfg_inc_path); MESA_HANDLE_RUNTIME_LOG (common->runtime_logger,RLOG_LV_INFO,"[READ_PROFILE]","----- version: %d\n", producer_profile.version); MESA_HANDLE_RUNTIME_LOG (common->runtime_logger,RLOG_LV_INFO,"[READ_PROFILE]","----- scan_interval: %d s\n", producer_profile.read_file_interval); if(common->debug_switch == DEBUG_SWITCH_ON) { printf ("read config file %s -----\n", buf_dir); printf ("----- version: %ld\n", producer_profile.version); printf ("----- scan_interval: %d s\n", producer_profile.read_file_interval); } assert(ret>=0); return ret; } int producer_t::producer_thread_run() { pthread_attr_t worker_thread_attr; pthread_t producer_worker; pthread_attr_init(&worker_thread_attr); pthread_attr_setdetachstate(&worker_thread_attr,PTHREAD_CREATE_DETACHED); pthread_create(&producer_worker, &worker_thread_attr,&producer_produce_work, (void*)this); return 0; } int producer_t::generate_user_info(char *user_info) { int ret = 0; int64_t now_time = common::Func::curr_time(); ret = snprintf(user_info,MAX_USER_INFO_LEN,"%ld",now_time); if (ret >= MAX_USER_INFO_LEN) { MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL,"[GENERATE_USER_INFO]","user_info is too long, should not bigger than %d!", MAX_ONE_CONTROL_MSG_LEN); return -1; } return 0; } int producer_t::generate_dest_line_for_maat_format(void *line_info,char *dest_line_for_maat) { int ret = 0; char dest_msg_line[MAX_ONE_MSG_LEN] = {0}; char control_line_info[MAX_ONE_CONTROL_MSG_LEN] = {0}; determinant_line_info_t *determinant_line_info; indeterminant_line_info_t *indeterminant_line_info; memset(dest_line_for_maat,0,MAX_ONE_CONFIG_MSG_LINE_LEN); switch(common->pz_type) { case PZ_TYPE_DETERMINANT: determinant_line_info = (determinant_line_info_t *)line_info; //fromat:control-msg dest_writed_msg //control-msg:topic \t other-control-info //dest_writed_msg:filename \t is-valid and etc //modify by 2018-01-09 maat need a correct is-valid_column //fromat:line control-msg dest_writed_msg //generate control_line_info ret = snprintf(control_line_info,MAX_ONE_CONTROL_MSG_LEN,"%s\n",determinant_line_info->topic); if (ret >= MAX_ONE_CONTROL_MSG_LEN) { MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL,"[GENERATE_CONTROL_LINE_INFO]","control_line_info too long, should not bigger than %d!", MAX_ONE_CONTROL_MSG_LEN); return -1; } //generate dest_msg_line ret = snprintf(dest_msg_line,MAX_ONE_MSG_LEN,"%s\t%s",determinant_line_info->dest_part_msg_line,determinant_line_info->user_info); if (ret >= MAX_ONE_MSG_LEN) { MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL,"[GENERATE_DEST_MSG_LINE]","dest_msg_line too long, should not bigger than %d!", MAX_ONE_MSG_LEN); return -1; } //generate line ret = snprintf(dest_line_for_maat,MAX_ONE_CONFIG_MSG_LINE_LEN,"%s %s",dest_msg_line,control_line_info); if (ret >= MAX_ONE_CONFIG_MSG_LINE_LEN) { MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL,"[GENERATE_DEST_LINE_FOR_MAAT]"," dest_line_for_maat too long, should not bigger than %d!", MAX_ONE_CONFIG_MSG_LINE_LEN); return -1; } break; case PZ_TYPE_NOT_DETERMINANT: //control-msg:fdfs_file_id,topic indeterminant_line_info = (indeterminant_line_info_t *)line_info; //generate control_line_info ret = snprintf(control_line_info,MAX_ONE_CONTROL_MSG_LEN,"%s\t%s\n" ,indeterminant_line_info->topic ,indeterminant_line_info->fdfs_file_id); if (ret >= MAX_ONE_CONTROL_MSG_LEN) { MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL,"[GENERATE_CONTROL_LINE_INFO]"," control_line_info too long, should not bigger than %d!", MAX_ONE_CONTROL_MSG_LEN); return -1; } ret = generate_user_info(indeterminant_line_info->user_info); //generate dest_msg_line ret = snprintf(dest_msg_line,MAX_ONE_MSG_LEN,"%s\t%d\t%s\t%s\t%s" ,indeterminant_line_info->file_name ,indeterminant_line_info->is_valid ,indeterminant_line_info->file_path ,indeterminant_line_info->file_md5 ,indeterminant_line_info->user_info); if (ret >= MAX_ONE_MSG_LEN) { MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL,"[GENERATE_DEST_MSG_LINE]"," dest_msg_line too long, should not bigger than %d!", MAX_ONE_MSG_LEN); return -1; } //generate line ret = snprintf(dest_line_for_maat,MAX_ONE_CONFIG_MSG_LINE_LEN,"%s %s",dest_msg_line,control_line_info); if (ret >= MAX_ONE_CONFIG_MSG_LINE_LEN) { MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL,"[GENERATE_DEST_LINE_FOR_MAAT]"," dest_line_for_maat too long, should not bigger than %d!", MAX_ONE_CONFIG_MSG_LINE_LEN); return -1; } break; default: MESA_HANDLE_RUNTIME_LOG(common->error_logger, RLOG_LV_FATAL, "[PZ_TYPE]" ,"pz_type:%d error!",common->pz_type); break; } //if(common->debug_switch) //printf("control_line_info:%s\n dest_msg_line:%s\ndest_line_for_maat=old_msg_line+control_line_info+dest_msg_line\n%s",control_line_info,dest_msg_line,dest_line_for_maat); return 0; } //check the format of determinant file and get valid and invalid msg int producer_t::read_determinant_file_get_valid_condition(determinant_line_info_t *determinant_line_info,int *valid_msg_num,int *invalid_msg_num,int *error_msg_num) { int local_valid_msg_num=*valid_msg_num; int local_invalid_msg_num=*invalid_msg_num; char line[MAX_ONE_CONFIG_MSG_LINE_LEN]={0},*ret_str=NULL; char outbuf[MAX_ONE_CONFIG_MSG_LINE_LEN];void *err_logger = common->error_logger; int cfg_num=0, i=0, ret=0; FILE*fp=fopen(determinant_line_info->file_path, "r"); if(fp==NULL) { MESA_HANDLE_RUNTIME_LOG(err_logger, RLOG_LV_FATAL, "[READ_FILE_GET_VALID_CONDITION]" ,"<%s +%d> update error,open %s failed." , __FILE__, __LINE__ , determinant_line_info->file_path); ret=-1; goto ERR_OUT; } fscanf(fp,"%d\n", &cfg_num); if(determinant_line_info->total_line_num != cfg_num) { MESA_HANDLE_RUNTIME_LOG(err_logger,RLOG_LV_FATAL, "[READ_FILE_GET_VALID_CONDITION]" ,"file %s config num not matched" , determinant_line_info->file_path); fclose(fp); fp = NULL; return -1; } //MESA_HANDLE_RUNTIME_LOG(logger,RLOG_LV_INFO, "read_determinant_table_file", "file %s config num: %d", determinant_line_info->file_path, determinant_line_info->total_line_num); for(i = 0; i < cfg_num; i++) { line[sizeof(line)-1]='\0'; ret_str=fgets(line,sizeof(line),fp); if(ret_str==NULL) { MESA_HANDLE_RUNTIME_LOG(err_logger,RLOG_LV_FATAL, "[READ_FILE_GET_VALID_CONDITION]" ,"<%s +%d>update error, file %s line_num %d less than claimed %d" , __FILE__, __LINE__, determinant_line_info->file_path, i, cfg_num); break; } if(line[sizeof(line)-1]!='\0') { MESA_HANDLE_RUNTIME_LOG(err_logger,RLOG_LV_FATAL, "[READ_FILE_GET_VALID_CONDITION]" ,"update error, line size more than %u at of file %s:%d" ,sizeof(line), determinant_line_info->file_path, i); continue; } if(memchr(line,' ',strlen(line)) != NULL)//filter the line which contain blank { *error_msg_num++; continue; } /* is_vaild */ memset(outbuf, 0, sizeof(outbuf)); ret = rc_get_field_value(line, determinant_line_info->is_valid_location, determinant_line_info->is_valid_location, '\t', outbuf, sizeof(outbuf)); if(ret < 0){ MESA_HANDLE_RUNTIME_LOG(err_logger, RLOG_LV_FATAL, "[READ_FILE_GET_VALID_CONDITION]" ,"rc_get_field_value is_vaild error rc=%d",ret); ret=-1; goto ERR_OUT; } determinant_line_info->is_valid= atoi(outbuf); //update valid_msg_num invalid_msg_num ret = update_valid_condition(determinant_line_info->is_valid,valid_msg_num,invalid_msg_num,err_logger); if(ret < 0){ ret=-1; goto ERR_OUT; } } if(common->debug_switch) { MESA_HANDLE_RUNTIME_LOG(common->stat_logger,RLOG_LV_FATAL, "[READ_FILE_GET_VALID_CONDITION]", "read file:%s, msg_num:%d, valid msg num: %d, invalid msg num:%d----TOTAL valid msg num: %d, TOATAL invalid msg num:%d" ,determinant_line_info->file_path ,cfg_num ,*valid_msg_num-local_valid_msg_num ,*invalid_msg_num-local_invalid_msg_num ,*valid_msg_num,*invalid_msg_num); } /* MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_FATAL, "[READ_FILE_GET_VALID_CONDITION]", "read file:%s, msg_num:%d, valid msg num: %d, invalid msg num:%d" ,determinant_line_info->file_path ,cfg_num ,*valid_msg_num-local_valid_msg_num ,*invalid_msg_num-local_invalid_msg_num); */ ERR_OUT: fclose(fp); return ret; } int producer_t::read_indeterminant_file_get_valid_condition(indeterminant_line_info_t*indeterminant_line_info,int *valid_msg_num,int *invalid_msg_num) { int ret = 0; int file_size = get_file_size(indeterminant_line_info->file_path); if(file_size <= 0) { return -1; } indeterminant_line_info->file_size = file_size; ret =update_valid_condition(indeterminant_line_info->is_valid,valid_msg_num,invalid_msg_num, common->error_logger); if(ret >= 0) { MESA_HANDLE_RUNTIME_LOG(common->stat_logger,RLOG_LV_FATAL, "[READ_FILE_GET_VALID_CONDITION]", "read file:%s,valid msg num: %d,invalid msg num:%d", indeterminant_line_info->file_path,indeterminant_line_info,*valid_msg_num,*invalid_msg_num); MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_FATAL, "[READ_FILE_GET_VALID_CONDITION]", "read file:%s,valid msg num: %d,invalid msg num:%d", indeterminant_line_info->file_path,*valid_msg_num,*invalid_msg_num); } return ret; } int producer_t::analysis_index_get_valid_condition(int *valid_msg_num,int *invalid_msg_num,int *error_ms_num,char *index_name_path) { int ret=0; bool topic_ret=0; FILE* idx_fp=NULL; char line[131072]={0}; determinant_line_info_t determinant_line_info; indeterminant_line_info_t indeterminant_line_info; idx_fp=fopen(index_name_path, "r"); void *logger = common->runtime_logger; MESA_HANDLE_RUNTIME_LOG(logger,RLOG_LV_INFO, "[READ_FILE_GET_VALID_CONDITION]", "read file %s ", index_name_path); fgets(line, sizeof(line), idx_fp); if(strlen(line)<2){ MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL, "[READ_FILE_GET_VALID_CONDITION]", "read file %s,file content check error on first line error ", index_name_path); ret=-1; goto ERR_OUT; } while(fgets(line, sizeof(line), idx_fp)) { switch(common->pz_type) { case PZ_TYPE_DETERMINANT: memset(&determinant_line_info, 0, sizeof(determinant_line_info)); ret = parse_determinant(line); if(ret == false){ MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL, "[READ_FILE_GET_VALID_CONDITION]", "read file %s ,file content check error on first line error", index_name_path); ret=-1; goto ERR_OUT; } topic_ret = topic_check(common->run_mode,determinant_line_info.topic,NULL); if(topic_ret == false) MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL, "[READ_FILE_GET_VALID_CONDITION]", "read file %s ,file content check error, topic invalid", index_name_path); ret=sscanf(line, "%s\t%d\t%s\t%d\t%d\n", determinant_line_info.file_name, &determinant_line_info.total_line_num, determinant_line_info.file_path, &determinant_line_info.msg_id_location, &determinant_line_info.is_valid_location); if(ret==5 && determinant_line_info.total_line_num > 0)//jump over empty line { //get_valid_pz_num,get_invalid_pz_num ret = read_determinant_file_get_valid_condition(&determinant_line_info,valid_msg_num,invalid_msg_num,error_ms_num); if(ret <0) { ret=-1; goto ERR_OUT; } } else { ret=-1; goto ERR_OUT; } break; case PZ_TYPE_NOT_DETERMINANT: memset(&indeterminant_line_info, 0, sizeof(indeterminant_line_info)); ret = parse_indeterminant(line); if(ret == false){ if(line[0] == '\n') break; } if(line[0]!='\n' && ret == false){ MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL, "[READ_FILE_GET_VALID_CONDITION]", "read file %s ,file content check error", index_name_path); return -1; } ret=sscanf(line, "%ld\t%ld\t%d\t%s\t%s\t%s\t%s", &indeterminant_line_info.seq, &indeterminant_line_info.msg_id, &indeterminant_line_info.is_valid, indeterminant_line_info.file_path, indeterminant_line_info.topic, indeterminant_line_info.file_md5, indeterminant_line_info.user_info); topic_ret = topic_check(common->run_mode,indeterminant_line_info.topic,NULL); if(indeterminant_line_info.seq == 0 || topic_ret == 0){ // we should retry it! if(common->debug_switch == DEBUG_SWITCH_ON) printf("read invalid file"); if(topic_ret == false) MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL, "[READ_FILE_GET_VALID_CONDITION]", "read file %s ,file content check error, topic invalid", index_name_path); else MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL, "[READ_FILE_GET_VALID_CONDITION]", "read file %s ,file content check error", index_name_path); { ret=-1; goto ERR_OUT; } } if(ret==7)//jump over empty line { ret = read_indeterminant_file_get_valid_condition(&indeterminant_line_info,valid_msg_num,invalid_msg_num); if(ret <0) { ret=-1; goto ERR_OUT; } } else { ret=-1; goto ERR_OUT; } break; default: break; } } ERR_OUT: fclose(idx_fp); idx_fp = NULL; return ret; } int producer_t::read_determinant_file_write_redis(determinant_line_info_t *determinant_line_info) { int ret=0; int cfg_num=0, i=0; char outbuf[MAX_ONE_CONFIG_MSG_LINE_LEN] ={0}; void *logger = common->runtime_logger; void *err_logger= common->error_logger; char line[MAX_ONE_CONFIG_MSG_LINE_LEN]={0},*ret_str=NULL; char dest_line_for_maat[MAX_ONE_CONFIG_MSG_LINE_LEN]={0}; int local_valid_msg_index = variable_lines_for_maat_cmd->valid_msg_index; int local_invalid_msg_index = variable_lines_for_maat_cmd->invalid_msg_index; FILE*fp=fopen(determinant_line_info->file_path, "r"); if(fp==NULL) { return -1; } //filter first line fscanf(fp,"%d\n", &cfg_num); //In the "read_determinant_file_get_valid_condition" have check this /*if(determinant_line_info->total_line_num != cfg_num) { fclose(fp); return -1; }*/ for(i = 0; i < cfg_num; i++) { line[sizeof(line)-1]='\0'; ret_str=fgets(line,sizeof(line),fp); if(ret_str==NULL) { break; } if(line[sizeof(line)-1]!='\0')//filter too long line which length >5k { determinant_line_info->total_line_num--; MESA_HANDLE_RUNTIME_LOG(err_logger, RLOG_LV_FATAL, "[READ_FILE_WRITE_REDIS]","the line too long(>5k),skip this line.file name %s,line info:%s",determinant_line_info->file_name,line); continue; } if(memchr(line,' ',strlen(line)) != NULL)//filter the line which contain blank { determinant_line_info->total_line_num--; MESA_HANDLE_RUNTIME_LOG(err_logger, RLOG_LV_FATAL, "[READ_FILE_WRITE_REDIS]","the line contain blank,skip this line.file name %s,line info:%s",determinant_line_info->file_name,line); continue; } /* seq */ memset(outbuf, 0, sizeof(outbuf)); ret = rc_get_field_value(line, 1, 1, '\t', outbuf, sizeof(outbuf)); if(ret <0){ MESA_HANDLE_RUNTIME_LOG(err_logger, RLOG_LV_FATAL, "[READ_FILE_WRITE_REDIS]","rc_get_field_value seq rc=%d",ret); return ret; } determinant_line_info->seq= atoi(outbuf); /* topic */ ret = rc_get_field_value(line, 2, 2, '\t', determinant_line_info->topic, sizeof(determinant_line_info->topic)); if(ret <0){ MESA_HANDLE_RUNTIME_LOG(err_logger, RLOG_LV_FATAL, "[READ_FILE_WRITE_REDIS]","rc_get_field_value topic rc=%d",ret); return ret; } /* rule_id = msg_id */ memset(outbuf, 0, sizeof(outbuf)); ret = rc_get_field_value(line, determinant_line_info->msg_id_location, determinant_line_info->msg_id_location, '\t',outbuf, sizeof(outbuf)); if(ret <0){ MESA_HANDLE_RUNTIME_LOG(err_logger, RLOG_LV_FATAL, "[READ_FILE_WRITE_REDIS]","rc_get_field_value msg_id rc=%d",ret); return ret; } determinant_line_info->msg_id= atoi(outbuf); //printf("msg_id:%d\n",determinant_line_info->msg_id); /* is_vaild */ memset(outbuf, 0, sizeof(outbuf)); ret = rc_get_field_value(line, determinant_line_info->is_valid_location, determinant_line_info->is_valid_location, '\t', outbuf, sizeof(outbuf)); if(ret < 0){ MESA_HANDLE_RUNTIME_LOG(err_logger, RLOG_LV_FATAL, "[READ_FILE_WRITE_REDIS]","rc_get_field_value is_vaild rc=%d",ret); return ret; } determinant_line_info->is_valid= atoi(outbuf); //generate dest_part_msg_line memset(determinant_line_info->dest_part_msg_line,0,sizeof(determinant_line_info->dest_part_msg_line)); ret = rc_get_field_value(line, 3, -1, '\t', determinant_line_info->dest_part_msg_line, sizeof(determinant_line_info->dest_part_msg_line)); if(ret <0){ MESA_HANDLE_RUNTIME_LOG(err_logger, RLOG_LV_FATAL, "[READ_FILE_WRITE_REDIS]","rc_get_field_value dest_part_msg_line rc=%d",ret); return ret; } //remove '\n' determinant_line_info->dest_part_msg_line[strlen(determinant_line_info->dest_part_msg_line)-1] = 0; //generate old_msg_line snprintf(determinant_line_info->old_msg_line,MAX_ONE_MSG_LEN,"%s",line); //gererate_user_info(); ret = generate_user_info(determinant_line_info->user_info); //get dest_format_line ret = generate_dest_line_for_maat_format((void *)determinant_line_info,dest_line_for_maat); //update msgs in maat redis ret =variable_lines_for_maat_cmd->maat_cmd_lines_update((void*)determinant_line_info,dest_line_for_maat); assert(ret>=0); } fclose(fp); i = 0; for(set::iterator it = (common->registered_table_info).begin();\ it!=(common->registered_table_info).end();\ it++) { if(memcmp(determinant_line_info->file_name,(*it).c_str(),strlen(determinant_line_info->file_name)) == 0) { FS_operate(common->stat_handle,common->fs_line_id[i],common->fs_column_id[COLUMN_TABLE_TOTAL_NUM],FS_OP_SET,cfg_num); FS_operate(common->stat_handle,common->fs_line_id[i],common->fs_column_id[COLUMN_TABLE_CORRECT_NUM],FS_OP_SET,determinant_line_info->total_line_num); FS_operate(common->stat_handle,common->fs_line_id[i],common->fs_column_id[COLUMN_TABLE_ERROR_NUM],FS_OP_SET,cfg_num-determinant_line_info->total_line_num); break; } i++; } if(common->debug_switch) { MESA_HANDLE_RUNTIME_LOG(common->stat_logger,RLOG_LV_FATAL, "[READ_FILE_WRITE_REDIS]" ,"read file:%s, msg_num:%d, valid msg num: %d, invalid msg num:%d" , determinant_line_info->file_path ,cfg_num ,variable_lines_for_maat_cmd->valid_msg_index-local_valid_msg_index ,variable_lines_for_maat_cmd->invalid_msg_index-local_invalid_msg_index); } MESA_HANDLE_RUNTIME_LOG(logger,RLOG_LV_FATAL, "[READ_FILE_WRITE_REDIS]" ,"read file:%s, msg_num:%d, valid msg num: %d, invalid msg num:%d" , determinant_line_info->file_path ,cfg_num ,variable_lines_for_maat_cmd->valid_msg_index-local_valid_msg_index ,variable_lines_for_maat_cmd->invalid_msg_index-local_invalid_msg_index); return 0; } int producer_t::read_indeterminant_file_write_redis_fdfs(indeterminant_line_info_t *indeterminant_line_info ) { int ret=0; //seq isvalid filepath MD5 UserInfo char dest_line_for_maat[MAX_ONE_CONFIG_MSG_LINE_LEN]={0}; MESA_HANDLE_RUNTIME_LOG(common->perf_logger,RLOG_LV_INFO, "[READ_FILE_WRITE_REDIS]", "the file:%s upload/delete to fdfs,average file size:%d" ,indeterminant_line_info->file_name,indeterminant_line_info->file_size); if(indeterminant_line_info->is_valid == 1) { //undo:upload file to fastdfs to get fdfs_file_id ret = upload_file_to_fdfs(indeterminant_line_info->file_path,indeterminant_line_info->fdfs_file_id); if(ret < 0) { MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL, "[READ_FILE_WRITE_REDIS]", "the file upload to fdfs failed"); return -1; } cout<<"file_id:"<fdfs_file_id<runtime_logger,RLOG_LV_DEBUG, "[READ_FILE_WRITE_REDIS]", "the file id:%s upload to fdfs finish",indeterminant_line_info->fdfs_file_id); (common->file_name2fdfs_file_id)[indeterminant_line_info->file_name] = indeterminant_line_info->fdfs_file_id; //undo write into redis to persistant } else { //get fdfs_file_id to delete file(indeterminant_line_info->fdfs_file_id) //undo:delete file on fastdfs ret = delete_file_on_fdfs((common->file_name2fdfs_file_id)[indeterminant_line_info->file_name].c_str()); } ret = generate_dest_line_for_maat_format((void *)indeterminant_line_info,dest_line_for_maat); ret =variable_lines_for_maat_cmd->maat_cmd_lines_update((void *)indeterminant_line_info,dest_line_for_maat); MESA_HANDLE_RUNTIME_LOG(common->perf_logger,RLOG_LV_INFO, "[READ_FILE_WRITE_REDIS]", "the file:%ld upload/delete to fdfs finish",indeterminant_line_info->seq); return 0; } int producer_t::analysis_index_write_redis_fdfs(int valid_msg_num, int invalid_msg_num,char *index_name_path) { int ret=0; FILE* idx_fp=NULL; char line[MAX_ONE_MSG_LEN]; determinant_line_info_t determinant_line_info; indeterminant_line_info_t indeterminant_line_info; idx_fp=fopen(index_name_path, "r"); void *logger = common->runtime_logger; // variable_lines_for_maat_cmd is a point variable_lines_for_maat_cmd=new variable_for_maat_cmd_t(common,valid_msg_num,invalid_msg_num); MESA_HANDLE_RUNTIME_LOG(logger,RLOG_LV_INFO, "[READ_FILE_WRITE_REDIS]", "read file %s again", index_name_path); //filter first line fgets(line, sizeof(line), idx_fp); while(fgets(line, sizeof(line), idx_fp)) { switch(common->pz_type) { case PZ_TYPE_DETERMINANT: memset(&determinant_line_info, 0, sizeof(determinant_line_info)); ret=sscanf(line, "%s\t%d\t%s\t%d\t%d\n", determinant_line_info.file_name, &determinant_line_info.total_line_num, determinant_line_info.file_path, &determinant_line_info.msg_id_location, &determinant_line_info.is_valid_location); if(ret==5 && determinant_line_info.total_line_num > 0)//jump over empty line { //read_determinant_file_write_redis ret = read_determinant_file_write_redis(&determinant_line_info); assert(ret>=0); if(ret <0) { ret=-1; goto ERR_OUT; } } else { ret=-1; goto ERR_OUT; } break; case PZ_TYPE_NOT_DETERMINANT: memset(&indeterminant_line_info, 0, sizeof(indeterminant_line_info)); ret=sscanf(line, "%ld\t%ld\t%d\t%s\t%s\t%s\t%s\n", &indeterminant_line_info.seq, &indeterminant_line_info.msg_id, &indeterminant_line_info.is_valid, indeterminant_line_info.file_path, indeterminant_line_info.topic, indeterminant_line_info.file_md5, indeterminant_line_info.user_info); if(ret==7)//jump over empty line { //file_name is seqence; sprintf(indeterminant_line_info.file_name,"%ld",indeterminant_line_info.seq); //old_msg_line for a correct is-valid column snprintf(indeterminant_line_info.old_msg_line,MAX_ONE_MSG_LEN,"%s",line); //printf("msg_id:%d\n",indeterminant_line_info.msg_id); ret = read_indeterminant_file_write_redis_fdfs(&indeterminant_line_info); //assert(ret>=0); } else { ret=-1; goto ERR_OUT; } break; default: break; } } //正常返回才添加Redis if(ret >= 0) { ret = variable_lines_for_maat_cmd->generate_maat_cmd_write_redis(); } else { MESA_HANDLE_RUNTIME_LOG(logger,RLOG_LV_FATAL, "[WRITE-REDIS]", "Write redis error! Please look the error_stat.log."); goto ERR_OUT; } MESA_HANDLE_RUNTIME_LOG(logger,RLOG_LV_INFO, "[WRITE-REDIS]", "total write %d msgs into redis,including %d valid msgs, %d invalid msgs" ,variable_lines_for_maat_cmd->valid_msg_index+variable_lines_for_maat_cmd->invalid_msg_index ,variable_lines_for_maat_cmd->valid_msg_index ,variable_lines_for_maat_cmd->invalid_msg_index); FS_passive_output(common->stat_handle); tensor_field_stat_reset(common); ERR_OUT: fclose(idx_fp); delete variable_lines_for_maat_cmd; return ret; } int producer_t::get_indeterminant_index_path(char *index_name_path, int index_name_len) { unsigned long full_path_ver = 0;int full_meta_type = UPDATE_NONE; char full_index_name[64] = {0};void *logger = common->runtime_logger; memset(full_index_name, 0, sizeof(full_index_name)); full_path_ver = producer_profile.version; full_meta_type = get_index_name(producer_profile.cfg_full_path, &full_path_ver, full_index_name, sizeof(full_index_name), common->pz_type, logger); if(full_meta_type == UPDATE_NONE) { MESA_HANDLE_RUNTIME_LOG(logger, RLOG_LV_DEBUG, "deal_indeterminant_file", "<%s +%d> indeterminant meta_type: %s", __FILE__, __LINE__, "UPDATE_NONE"); return UPDATE_NONE; } //producer_profile->version = full_path_ver; snprintf(index_name_path, index_name_len, "%s/%s", producer_profile.cfg_full_path, full_index_name); MESA_HANDLE_RUNTIME_LOG(logger, RLOG_LV_DEBUG, "deal_indeterminant_file", "<%s +%d> meta_type: %s\n%s", __FILE__, __LINE__, (full_meta_type==UPDATE_TYPE_FULL) ? "UPDATE_TYPE_FULL" : "UPDATE_TYPE_INC", index_name_path); return full_meta_type; } //获取index name 名称 int producer_t::get_determinant_index_path(char *index_name_path, int index_name_len) { unsigned long full_path_ver = 0, inc_path_ver = 0; int meta_type = UPDATE_NONE;void *logger = common->runtime_logger; int full_meta_type = UPDATE_NONE;int inc_meta_type = UPDATE_NONE; char full_index_name[64] = {0}, inc_index_name[64] = {0}; memset(full_index_name, 0, sizeof(full_index_name)); memset(inc_index_name, 0, sizeof(inc_index_name)); full_path_ver = producer_profile.version; inc_path_ver = producer_profile.version; full_meta_type = get_index_name(producer_profile.cfg_full_path, &full_path_ver, full_index_name, sizeof(full_index_name), common->pz_type, logger); inc_meta_type = get_index_name(producer_profile.cfg_inc_path, &inc_path_ver, inc_index_name, sizeof(inc_index_name), common->pz_type, logger); if(full_meta_type == UPDATE_NONE && inc_meta_type == UPDATE_NONE) { MESA_HANDLE_RUNTIME_LOG(logger, RLOG_LV_DEBUG, "deal_determinant_file","determinant meta_type: %s", "UPDATE_NONE"); return UPDATE_NONE; } if(full_meta_type == UPDATE_TYPE_FULL) { if(inc_meta_type == UPDATE_TYPE_FULL) { if(full_path_ver > inc_path_ver) { //producer_profile->version = full_path_ver; snprintf(index_name_path, index_name_len, "%s/%s", producer_profile.cfg_full_path, full_index_name); } else { //producer_profile->version = inc_path_ver; snprintf(index_name_path, index_name_len, "%s/%s", producer_profile.cfg_inc_path, inc_index_name); } } else { //producer_profile->version = full_path_ver; snprintf(index_name_path, index_name_len, "%s/%s", producer_profile.cfg_full_path, full_index_name); } meta_type = UPDATE_TYPE_FULL; } else { //producer_profile->version = inc_path_ver; snprintf(index_name_path, index_name_len, "%s/%s", producer_profile.cfg_inc_path, inc_index_name); meta_type = inc_meta_type; } MESA_HANDLE_RUNTIME_LOG(logger, RLOG_LV_DEBUG, "deal_determinant_file", "<%s +%d> meta_type: %s\n%s", __FILE__, __LINE__, (meta_type==UPDATE_TYPE_FULL) ? "UPDATE_TYPE_FULL" : "UPDATE_TYPE_INC", index_name_path); return meta_type; }