#include "consumer.h" void notify_table_update_start(int update_type,void* u_para) { status_for_register_t *file_status_for_register= (status_for_register_t *)u_para; common_module_t *common= file_status_for_register->common; file_status_for_register->update_type = update_type; MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_DEBUG,"[TABLE_UPDATE_START]" ,"notify_table_update_start file_name:%s..update_type:%s" ,file_status_for_register->file_status_for_manager->file_name ,(update_type == 1)?"UPDATE_TYPE_FULL":"UPDATE_TYPE_INC"); MESA_HANDLE_RUNTIME_LOG(common->stat_logger,RLOG_LV_FATAL,"[TABLE_UPDATE_START]" ,"notify_table_update_start file_name:%s..update_type:%s" ,file_status_for_register->file_status_for_manager->file_name ,(update_type == 1)?"UPDATE_TYPE_FULL":"UPDATE_TYPE_INC"); int ret = file_status_for_register->business_related_manager_service_start(); if(common->write_file == 1) ret = file_status_for_register->persist_msg2local_file_consumer_service_start(); else file_status_for_register->file_status_for_persistence->is_processed = 0; // when 'wirte_file' switch is off, 'is_processed' should also be set to 0 if(ret < 0) { MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL,"[TABLE_UPDATE_START]" ,"notify_table_update_start file_name:%s..update_type:%s error" ,file_status_for_register->file_status_for_manager->file_name ,(update_type == 1)?"UPDATE_TYPE_FULL":"UPDATE_TYPE_INC"); MESA_HANDLE_RUNTIME_LOG(common->main_logger,RLOG_LV_FATAL,"[TABLE_UPDATE_START]" ,"business % notify_table_update_start file_name:%s..update_type:%s error" ,file_status_for_register->common->local_state_for_manager->business_name ,file_status_for_register->file_status_for_manager->file_name ,(update_type == 1)?"UPDATE_TYPE_FULL":"UPDATE_TYPE_INC"); } assert(ret>=0); return; } void notify_table_update_ing(int table_id,const char* table_line,void* u_para) { status_for_register_t *file_status_for_register= (status_for_register_t *)u_para; common_module_t *common= file_status_for_register->common;int ret=0; //parse msg from producer char control_line_info[MAX_ONE_MSG_LEN] = {0}; char dest_msg_line[MAX_ONE_MSG_LEN] = {0}; //char old_msg_line[MAX_ONE_MSG_LEN]= {0};//no use here //fromat:old_msg_line control-msg dest_writed_msg //control-msg:topic \t user-control-info //dest_writed_msg:filename \t is-valid and etc ret = sscanf(table_line,"%[^ ] %[^ ]",dest_msg_line,control_line_info); if(ret != 2) { MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL,"[TABLE_UPDATE_ING]" ,"notify_table_update_ing:file_name:%s,parse msg error." ,file_status_for_register->file_status_for_manager->file_name); return; } //if(common->debug_switch) // printf("control_line_info:%s\ndest_msg_line:%s\ndest_line_for_maat =:old_msg_line+control_line_info+dest_msg_line\n%s",control_line_info,dest_msg_line,table_line); //undo :topic filer module ret= file_status_for_register->business_related_manager_service_ing(control_line_info,dest_msg_line); if(common->write_file == 1) ret= file_status_for_register->persist_msg2local_file_consumer_service_ing(control_line_info,dest_msg_line); MESA_HANDLE_RUNTIME_LOG(common->stat_logger,RLOG_LV_DEBUG,"[TABLE_UPDATE_ING]" ,"notify_table_update_ing:file_name:%s,dest_msg_line :%s" ,file_status_for_register->file_status_for_persistence->tmp_file_name ,dest_msg_line); if(ret < 0) { MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL,"[TABLE_UPDATE_ING]" ,"notify_table_update_ing:file_name:%s error" ,file_status_for_register->file_status_for_manager->file_name); MESA_HANDLE_RUNTIME_LOG(common->main_logger,RLOG_LV_FATAL,"[TABLE_UPDATE_START]" ,"business % notify_table_update_ing:file_name:%s error" ,file_status_for_register->common->local_state_for_manager->business_name ,file_status_for_register->file_status_for_manager->file_name); } //assert(ret>=0); return; } void notify_table_update_end(void* u_para) { int ret=0; status_for_register_t *file_status_for_register= (status_for_register_t *)u_para; common_module_t *common= file_status_for_register->common; if(file_status_for_register->file_status_for_persistence->is_processed == 0) { MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_DEBUG,"[TABLE_UPDATE_END]" ,"notify_table_update_end:file:%s" ,file_status_for_register->file_status_for_persistence->tmp_file_name); MESA_HANDLE_RUNTIME_LOG(common->stat_logger,RLOG_LV_FATAL,"[TABLE_UPDATE_END]" ,"notify_table_update_end:file:%s" ,file_status_for_register->file_status_for_persistence->tmp_file_name); ret= file_status_for_register->business_related_manager_service_end(); if(common->write_file == 1) ret= file_status_for_register->persist_msg2local_file_consumer_service_end(); //table 1.2 if((common->zg_switch)&&(file_status_for_register->file_status_for_manager->total_line_num >0)&&(common->run_mode == PRODUCER_MODE)) { int ret_sendlog = 0; ret_sendlog = send_pub_order_no_for_manager(file_status_for_register->file_status_for_manager); if(ret_sendlog < 0) { MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL,"[MANAGER_SERVICE_END]","send_pub_order_no_for_manager (table 1.2) failed"); } assert(ret_sendlog >= 0); } } file_status_for_register->file_status_for_persistence->is_processed = 1; if(ret < 0) { MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL,"[TABLE_UPDATE_END]" ,"notify_table_update_end:file_name:%s error" ,file_status_for_register->file_status_for_persistence->dest_file_name); MESA_HANDLE_RUNTIME_LOG(common->main_logger,RLOG_LV_FATAL,"[TABLE_UPDATE_END]" ,"business %s notify_table_update_end:file_name:%s error " ,file_status_for_register->common->local_state_for_manager->business_name ,file_status_for_register->file_status_for_persistence->dest_file_name); } assert(ret>=0); return; } int consumer_t::consumer_load_profile() { int ret = 0;char buf_dir[MAX_PATH_LENGTH]; char full_path[MAX_PATH_LENGTH]={'0'}; char inc_path[MAX_PATH_LENGTH]={'0'}; unsigned int write_null_index_file; ret = snprintf(buf_dir, MAX_PATH_LENGTH, "%s/%s",common->config_path, COMMON_CONFIG_NAME); if(ret == MAX_PATH_LENGTH) { //printf("config file load , dir exceed the max length!\n"); MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_FATAL,"[CONSUMER_PROFILE]","config file load , dir exceed the max length!"); return -1; } ret = MESA_load_profile_uint_def (buf_dir, "CONSUMER", "sleep_time", &sleep_time, 3); ret = MESA_load_profile_uint_def (buf_dir, "CONSUMER", "retry_times", &retry_times, 1); ret = MESA_load_profile_uint_def (buf_dir, "CONSUMER", "write_null_index_file", &write_null_index_file, 0); ret = MESA_load_profile_string_nodef(buf_dir, "CONSUMER", "dest_path", dest_path, MAX_PATH_LENGTH); if(ret < 0) { //printf("config file load , read db_user failed!\n"); MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_FATAL,"[CONSUMER_PROFILE]","dest_path read failed ,check the %s!", buf_dir); return -1; } else { snprintf (tmp_path, MAX_PATH_LENGTH, "%s/tmp/", dest_path); check_file_path(tmp_path); snprintf (full_path, MAX_PATH_LENGTH, "%s/full/index/", dest_path); check_file_path(full_path); snprintf (inc_path, MAX_PATH_LENGTH, "%s/inc/index/", dest_path); check_file_path(inc_path); } //MESA_load_profile_int_def(buf_dir, "CONSUMER", "gc_time", &consumer->gc_time, 60*60); MESA_HANDLE_RUNTIME_LOG (common->runtime_logger,RLOG_LV_INFO,"[CONSUMER_PROFILE]","read config file %s -----", buf_dir); MESA_HANDLE_RUNTIME_LOG (common->runtime_logger,RLOG_LV_INFO,"[CONSUMER_PROFILE]"," -----tmp_path: %s", tmp_path); MESA_HANDLE_RUNTIME_LOG (common->runtime_logger,RLOG_LV_INFO,"[CONSUMER_PROFILE]"," -----dest_path: %s", dest_path); MESA_HANDLE_RUNTIME_LOG (common->runtime_logger,RLOG_LV_INFO,"[CONSUMER_PROFILE]"," -----file_mode: %d", common->pz_type); //consumer->gcwork = new GcWorker(consumer->dest_path, full_save_time); printf ("read config file %s -----\n", buf_dir); printf (" -----tmp_path: %s\n", tmp_path); printf (" -----dest_path: %s\n", dest_path); printf (" -----file_mode: %d\n", common->pz_type); return 0; } int consumer_t::consumer_init() { int ret = 0; ret = consumer_load_profile(); assert(ret >=0); if (ret != 0) { return -1; } return 0; } int consumer_t::register_table_to_maat_redis(const char *file_name,int register_index) { int ret=0, table_id=0; void *logger = common->runtime_logger; void *err_logger = common->error_logger; table_id=Maat_table_register(common->feather,file_name); if(table_id<0) { MESA_HANDLE_RUNTIME_LOG(err_logger,RLOG_LV_FATAL,"[TABLE_REGISTER]","Database table %s register failed.\n", file_name); MESA_HANDLE_RUNTIME_LOG(logger,RLOG_LV_FATAL,"[TABLE_REGISTER]","Database table %s register failed.\n", file_name); if(common->debug_switch == DEBUG_SWITCH_ON) printf("Database table %s register failed.\n",file_name); } else { //for zg; file_status_for_register[register_index]=new status_for_register_t(common); file_status_for_register[register_index]->file_status_for_manager =new status_for_manager(common,file_name); file_status_for_register[register_index]->file_status_for_persistence =new status_for_persistence(common,file_name,tmp_path,dest_path); file_status_for_register[register_index]->write_null_index_file = write_null_index_file; file_status_for_register[register_index]->retry_times = retry_times; ret = Maat_table_callback_register(common->feather, table_id, notify_table_update_start, notify_table_update_ing, notify_table_update_end, file_status_for_register[register_index]); if(ret<0) { ret = file_status_for_register[register_index]->file_status_destroy(); delete file_status_for_register[register_index]; MESA_HANDLE_RUNTIME_LOG(logger,RLOG_LV_FATAL,"[TABLE_REGISTER]","Maat callback register table %s failed.\n", file_name); MESA_HANDLE_RUNTIME_LOG(err_logger,RLOG_LV_FATAL,"[TABLE_REGISTER]","Maat CALLBACK register table %s failed.\n", file_name); } else { ret = fflush(file_status_for_register[register_index]->file_status_for_persistence->index_fp); MESA_HANDLE_RUNTIME_LOG(logger,RLOG_LV_FATAL,"[TABLE_REGISTER]","Maat callback register table %s success.\n", file_name); } } return ret; } int consumer_t::consumer_thread_run() { pthread_attr_t worker_thread_attr; pthread_t consumer_worker; pthread_attr_init(&worker_thread_attr); pthread_attr_setdetachstate(&worker_thread_attr,PTHREAD_CREATE_DETACHED); pthread_create(&consumer_worker, &worker_thread_attr,consumer_consume_work, (void*)(this)); return 0; } int status_for_register_t::business_related_manager_service_start() { long long version=0;int ret =0; ret=Maat_read_state(common->feather,MAAT_STATE_VERSION, &version, sizeof(version)); assert(ret==0); if(ret < 0) { MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL,"[MANAGER_SERVICE_START]","Maat get version:%ld failed.\n", version); } file_status_for_manager->file_size=0; file_status_for_manager->total_line_num=0; file_status_for_manager->index_version = version; MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_DEBUG,"[MANAGER_SERVICE_START]","Start success"); return 0; } int status_for_register_t::persist_msg2local_file_consumer_service_start() { file_status_for_persistence->index_version = file_status_for_manager->index_version; file_status_for_persistence->is_processed = 0; file_status_for_persistence->total_line_num = 0; int ret = file_status_for_persistence->write_file_init(update_type); if (ret != 0) { MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL,"[PERSISTENT_SERVICE_START]","write_file_init fail"); return -1; } //DETERMINANT open index file //INDETERMINANT:do not use index file 20180920 if((file_status_for_persistence->index_fp == NULL)&&(common->pz_type == PZ_TYPE_DETERMINANT)) ret = open_file(&(file_status_for_persistence->index_fp), file_status_for_persistence->tmp_index_file_name, INDEX_FILE, common->error_logger); //INDETERMINANT : download file to file_path and write the config file //DETERMINANT: write the config file if(file_status_for_persistence->file_fp == NULL) { ret = open_file(&(file_status_for_persistence->file_fp), file_status_for_persistence->tmp_file_name, NORMAL_FILE, common->error_logger); if(common->pz_type == PZ_TYPE_DETERMINANT) ret = write_file_add_num(file_status_for_persistence->file_fp,0); } assert(ret >=0); MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_DEBUG,"[PERSISTENT_SERVICE_START]","Start success"); return ret; } int status_for_register_t::business_related_manager_service_ing(char * control_line,char *dest_msg_line) { int ret =0; switch(common->pz_type) { case PZ_TYPE_DETERMINANT: ret = get_then_remove_user_info(dest_msg_line,file_status_for_manager->user_info); dest_msg_line[strlen(dest_msg_line)]='\n'; file_status_for_manager->file_size += strlen(dest_msg_line); file_status_for_manager->total_line_num +=1; break; case PZ_TYPE_NOT_DETERMINANT: file_status_for_manager->total_line_num += 1; indeterminant_line_info_t indeterminant_line_info; ret = sscanf(dest_msg_line,"%s\t%d\t%s\t%s\t%s" ,indeterminant_line_info.file_name ,&(indeterminant_line_info.is_valid) ,indeterminant_line_info.file_path ,indeterminant_line_info.file_md5 ,indeterminant_line_info.user_info); if(ret != 5) { MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL,"[MANAGER_SERVICE_ING]","file_name:%s,recv from redis error( not 7 columns)!\ dest_msg_line:%s",indeterminant_line_info.file_name,dest_msg_line); return -1; } // ,&(indeterminant_line_info.seq) strcpy(file_status_for_manager->file_name,indeterminant_line_info.file_name); strcpy(file_status_for_manager->file_path,indeterminant_line_info.file_path); strcpy(file_status_for_manager->file_md5,indeterminant_line_info.file_md5); file_status_for_manager->pub_order_no = get_pub_order_no(file_status_for_manager->file_name,common->local_state_for_manager->business_name ,file_status_for_manager->index_version); // ret = get_then_remove_user_info(dest_msg_line,file_status_for_manager->user_info); ///dest_msg_line[strlen(dest_msg_line)]='\n'; ret = parse_user_info(file_status_for_manager->user_info,&(common->local_state_for_manager->start_time)); //INDETERMINANT:get dest_sample_file_name here if(indeterminant_line_info.is_valid == 1) { strcpy(file_status_for_persistence->dest_sample_file_name,indeterminant_line_info.file_path); } else { memset(file_status_for_persistence->dest_sample_file_name,0,MAX_PATH_LENGTH); } common->local_state_for_manager->end_time=common::Func::curr_time(); //table 1.3 if(common->zg_switch) ret = send_pub_order_log_for_manager(file_status_for_manager); break; default: break; } //MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_DEBUG,"[MANAGER_SERVICE_ING]"," file_name:%s",file_status_for_persistence->tmp_file_name); // MESA_HANDLE_RUNTIME_LOG(common->stat_logger,RLOG_LV_DEBUG,"[MANAGER_SERVICE_ING]","file:%s, dest_msg_line:%s" // ,file_status_for_persistence->tmp_file_name,dest_msg_line); return 0; } int status_for_register_t::persist_msg2local_file_consumer_service_ing(char * control_line,char *dest_msg_line) { int i,ret=0; bool topic_ret=0; char msg_topic[MAX_LENGTH] = {0}, other_control_info[MAX_ONE_CONTROL_MSG_LEN] = {0}; // sscanf(control_line,"%s\t%s",msg_topic,other_control_info); //undo:topic filter module switch(common->pz_type) { case PZ_TYPE_DETERMINANT: sscanf(control_line,"%s",msg_topic); topic_ret = topic_check(common->run_mode,msg_topic,common->node_topic); if(topic_ret == false) { MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_DEBUG, "[PERSISTENT_SERVICE_ING]", "filter the config:%s," ,dest_msg_line); break; } file_status_for_persistence->total_line_num++; //get every msg .update the user-info //ret = get_then_remove_user_info(dest_msg_line,file_status_for_manager->user_info); ret = parse_user_info(file_status_for_manager->user_info,&(common->local_state_for_manager->start_time)); //append file_fp (dest_msg_line); //dest_msg_line[strlen(dest_msg_line)]='\n'; ret = append_to_write_file(file_status_for_persistence->file_fp, dest_msg_line, common->error_logger); break; case PZ_TYPE_NOT_DETERMINANT: char local_fdfs_id[MAX_PATH_LENGTH] = {0}; sscanf(control_line,"%s\t%s",msg_topic,other_control_info); topic_ret = topic_check(common->run_mode,msg_topic,common->node_topic); if(topic_ret == false) { MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_DEBUG, "[PERSISTENT_SERVICE_ING]", "filter the config:%s," ,dest_msg_line); break; } snprintf(local_fdfs_id,MAX_PATH_LENGTH,"%s",other_control_info); MESA_HANDLE_RUNTIME_LOG(common->perf_logger,RLOG_LV_INFO, "[PERSISTENT_SERVICE_ING]", "the file:%s, download from fdfs" ,file_status_for_persistence->dest_file_name); //if dest file name not none download file from fdfs if(strlen(file_status_for_persistence->dest_sample_file_name) != 0) { check_file_path(file_status_for_persistence->dest_sample_file_name);//check the file path ret = download_file_from_fdfs(local_fdfs_id,file_status_for_persistence->dest_sample_file_name); //If download failed then Retry once if(ret < 0) { do { sleep(3); ret = download_file_from_fdfs(local_fdfs_id,file_status_for_persistence->dest_sample_file_name); i++; }while((ret<0)&&(ierror_logger,RLOG_LV_FATAL, "[PERSISTENT_SERVICE_ING]", "the file:%s, download from fdfs failed" ,file_status_for_persistence->dest_sample_file_name); file_status_for_manager->total_line_num--; break; } else { MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_INFO, "[PERSISTENT_SERVICE_ING]", "the file:%s, download from fdfs finish" ,file_status_for_persistence->dest_sample_file_name); } //append index_fp(dest_msg_line) dest_msg_line[strlen(dest_msg_line)]='\n'; ret = append_to_write_file(file_status_for_persistence->file_fp, dest_msg_line, common->error_logger); break; } //assert(ret >=0); //MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_DEBUG,"[PERSISTENT_SERVICE_ING]","file name:%s",file_status_for_persistence->file_name); return ret; } int status_for_register_t::business_related_manager_service_end() { int ret=0; if(common->pz_type == PZ_TYPE_DETERMINANT) { file_status_for_manager->pub_order_no = get_pub_order_no(file_status_for_manager->file_name,common->local_state_for_manager->business_name,file_status_for_manager->index_version); // fake_md5= file_name +file_size; char fake_md5[MAX_PATH_LENGTH] = {0}; snprintf(fake_md5,MAX_PATH_LENGTH,"%s%d",file_status_for_manager->file_name,file_status_for_manager->file_size); string fake_md5_const(fake_md5); ret = msg2md5 (fake_md5_const.c_str(), file_status_for_manager->file_md5); assert(ret>=0); //table update end,send_pub_order_log_for_manager common->local_state_for_manager->end_time=common::Func::curr_time(); //table 1.3 if((common->zg_switch)&&(file_status_for_manager->total_line_num >0)) ret = send_pub_order_log_for_manager(file_status_for_manager); if(ret < 0) { MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL,"[MANAGER_SERVICE_END]","send_pub_order_log_for_manager (table 1.3) failed"); } assert(ret >= 0); MESA_HANDLE_RUNTIME_LOG(common->perf_logger,RLOG_LV_INFO, "[MANAGER_SERVICE_END]", " recv msgs from redis" ,file_status_for_manager->total_line_num); MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_INFO, "[MANAGER_SERVICE_END]", "table:%s ,producer send num %d,now receive num:%d" ,file_status_for_manager->file_name ,file_status_for_persistence->total_line_num ,file_status_for_manager->total_line_num); if(file_status_for_manager->total_line_num >0) cout<<"total_num:"<total_line_num<<",now received num:"<total_line_num<local_state_for_manager->addOutBytes(file_status_for_manager->file_size); assert(ret>=0); MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_DEBUG,"[MANAGER_SERVICE_END]","End success"); return ret; } int status_for_register_t::persist_msg2local_file_consumer_service_end() { int ret=0,is_last_updating_table=0,i=0; char index_msg[MAX_ONE_CONFIG_MSG_LINE_LEN] = {0}; void *err_logger= common->error_logger,*perf_logger =common->perf_logger; ret=Maat_read_state(common->feather,MAAT_STATE_LAST_UPDATING_TABLE, &is_last_updating_table, sizeof(is_last_updating_table)); assert(ret==0); for(set::iterator it = (common->registered_table_info).begin();\ it!=(common->registered_table_info).end();\ it++) { if(memcmp(file_status_for_persistence->file_name,(*it).c_str(),strlen(file_status_for_persistence->file_name)) == 0) { FS_operate(common->stat_handle,common->fs_line_id[i],common->fs_column_id[COLUMN_TABLE_TOTAL_NUM],FS_OP_SET,file_status_for_manager->total_line_num); break; } i++; } switch(common->pz_type) { case PZ_TYPE_DETERMINANT: //update num in the file_fp;close file_fp;rename tmp_file_name dest_file_name ret = write_file_add_num(file_status_for_persistence->file_fp,file_status_for_manager->total_line_num); common->total_config_num+=file_status_for_manager->total_line_num; ret = close_file(&(file_status_for_persistence->file_fp)); file_status_for_persistence->file_fp = NULL; ret = rename_file(file_status_for_persistence->tmp_file_name, file_status_for_persistence->dest_file_name, err_logger); ret = snprintf(index_msg,MAX_ONE_CONFIG_MSG_LINE_LEN, "%s\t%d\t%s\n",file_status_for_persistence->file_name ,file_status_for_manager->total_line_num ,file_status_for_persistence->dest_file_name); //append to write index file if(file_status_for_manager->total_line_num > 0) { ret = append_to_write_file(file_status_for_persistence->index_fp,index_msg,err_logger); } else { ret = remove(file_status_for_persistence->dest_file_name); } //for append index ret = fflush(file_status_for_persistence->index_fp); ret = close_file(&(file_status_for_persistence->index_fp)); file_status_for_persistence->index_fp = NULL; //if the last update table , rename tmp_index_file_name dest_index_file_name if(is_last_updating_table == 1) { //rename index file if((common->total_config_num == 0)&&(write_null_index_file == CONSUMER_WRITE_NULL_INDEX_OFF)) { MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_INFO, "[PERSISTENT_SERVICE_END]", "Not write null index file,the version is:%lld", file_status_for_persistence->index_version); break; } ret = rename_file(file_status_for_persistence->tmp_index_file_name, file_status_for_persistence->dest_index_file_name, err_logger); consumer_field_stat(common,file_status_for_persistence->index_version); MESA_HANDLE_RUNTIME_LOG(perf_logger,RLOG_LV_INFO, "[PERSISTENT_SERVICE_END]", "finish receiving version:%lld from redis,then start to perist", file_status_for_persistence->index_version); MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_INFO, "[PERSISTENT_SERVICE_END]", "finish receiving version:%lld from redis,total config num:%d", file_status_for_persistence->index_version,common->total_config_num); common->total_config_num = 0; } break; case PZ_TYPE_NOT_DETERMINANT: ret = close_file(&(file_status_for_persistence->file_fp)); file_status_for_persistence->file_fp = NULL; //rename tmp_file_name dest_file_name ret = rename_file(file_status_for_persistence->tmp_file_name, file_status_for_persistence->dest_file_name, err_logger); if(file_status_for_manager->total_line_num <= 0) { ret = remove(file_status_for_persistence->dest_file_name); MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_DEBUG,"[PERSISTENT_SERVICE_END]","Line num is 0,delete the file: %s",file_status_for_persistence->dest_file_name); } if(is_last_updating_table == 1) { consumer_field_stat(common,file_status_for_persistence->index_version); } break; default: break; } assert(ret>=0); MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_DEBUG,"[PERSISTENT_SERVICE_END]","End success"); return ret; }; int status_for_register_t::get_then_remove_user_info(char *dest_msg_line,char *user_info) { //user_info:last \t - last unsigned int user_info_index = 0; if(dest_msg_line == NULL) return -1; char * pos_chr=strrchr(dest_msg_line,'\t'); size_t pos = pos_chr-dest_msg_line; size_t len= strlen(dest_msg_line)-1-pos-1 +1; //pos+1 - strlen()-1 string user_info_string(dest_msg_line,pos+1,len); snprintf(user_info,MAX_USER_INFO_LEN,"%s",user_info_string.c_str()); if(strlen(user_info) < 16) MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL,"[GET_USER_INFO]","get user info:%s failed.\n", user_info); //printf("dest_msg_line:%s,user_info:%s\n",dest_msg_line,user_info); //remove user-info from dest_msg_line for(user_info_index = 0;user_info_index < len;user_info_index++) *(pos_chr+user_info_index)=0; return 0; } int status_for_register_t::parse_user_info(char *user_info,int64_t *start_time) { int ret= sscanf(user_info,"%ld",start_time); //printf("time:%ld",*start_time); return ret; } int status_for_register_t::file_status_destroy() { //destroy file status if(file_status_for_manager) { delete file_status_for_manager; file_status_for_manager=NULL; } if(file_status_for_persistence) { delete file_status_for_persistence; file_status_for_persistence=NULL; } return 0; } void consumer_field_stat(common_module_t *common,long long version) { FS_operate(common->stat_handle,common->fs_status_id[STATUS_VERSION],0,FS_OP_SET,version); FS_operate(common->stat_handle,common->fs_status_id[STATUS_TABLE_NUM],0,FS_OP_SET,common->register_table_num); FS_operate(common->stat_handle,common->fs_status_id[STATUS_TOTAL_NUM],0,FS_OP_SET,common->total_config_num); FS_passive_output(common->stat_handle); tensor_field_stat_reset(common); } int consumer_rename_index_file(common_module_t *common ,status_for_persistence *file_status_for_persistence) { int ret; ret = rename_file(file_status_for_persistence->tmp_index_file_name, file_status_for_persistence->dest_index_file_name, common->error_logger); if(ret >= 0) { char inc_sym_index_file_name[MAX_PATH_LENGTH]; ret = snprintf(inc_sym_index_file_name, MAX_PATH_LENGTH, "%s/inc/index/full_config_index.%010lld", file_status_for_persistence->dest_path, file_status_for_persistence->index_version); if (ret >= MAX_PATH_LENGTH) { MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL,"[consumer_rename_index_file]","file path too long:%s, should not bigger than %d!",inc_sym_index_file_name, MAX_PATH_LENGTH); return -1; } ret = symlink(file_status_for_persistence->dest_index_file_name,inc_sym_index_file_name); consumer_field_stat(common,file_status_for_persistence->index_version); common->total_config_num = 0; } return ret; } int consumer_register_table(consumer_t *consumer) { int retry = 0; int ret = 0,i = 0; int register_index = 0; int rename_index = -1; int deal_table_num = 0; int maat_version = -1; int config_num = -1; common_module_t *common = consumer->common; consumer->file_status_for_register = new status_for_register_t *[common->register_table_num]; for(set::iterator it = (common->registered_table_info).begin();\ it!=(common->registered_table_info).end();\ it++) { ret = consumer ->register_table_to_maat_redis((*it).c_str(),register_index++); if(ret < 0) { MESA_HANDLE_RUNTIME_LOG (common->error_logger, RLOG_LV_FATAL, "[TABLE_REGISTER]", "business %s register_table_to_maat_redis failed,please look at log/%s", common->local_state_for_manager->business_name,common->local_state_for_manager->business_name); return ret; } } //when first update,maat will not notify end of a version while((common->pz_type == PZ_TYPE_DETERMINANT) &&(retry<10)) { config_num = common->total_config_num; sleep(consumer->sleep_time); for(i=0;iregister_table_num;i++) { if(consumer->file_status_for_register[i]->file_status_for_persistence->is_processed == 1) { if(maat_version == -1)//only first update set value maat_version = consumer->file_status_for_register[i]->file_status_for_persistence->index_version; deal_table_num++; rename_index = i; } } if((maat_version != -1)&&(rename_index != -1)) { if(maat_version != consumer->file_status_for_register[rename_index]->file_status_for_persistence->index_version) { MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_FATAL, "[FIRST FULL UPDATE]", "Maat Version Changed,skip the first version: %d,new version is %d", maat_version,consumer->file_status_for_register[rename_index]->file_status_for_persistence->index_version); break; } } //if(deal_table_num == common->register_table_num) if(config_num == common->total_config_num)//Check the config num if equal the last time { if(common->total_config_num != 0) { status_for_persistence *file_status_for_persistence = consumer->file_status_for_register[rename_index]->file_status_for_persistence; //rename index file here ret = consumer_rename_index_file(common,file_status_for_persistence); if(ret >=0 ) { MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_INFO, "[FIRST FULL UPDATE]", "finish receiving version:%d from redis,total config num:%d",maat_version,common->total_config_num); } } else { MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_INFO, "[FIRST FULL UPDATE]", "There is no config."); } break; } deal_table_num = 0; retry++; if(retry == 10) { //rename index file here if((rename_index != -1)&&(common->total_config_num != 0)) { status_for_persistence *file_status_for_persistence = consumer->file_status_for_register[rename_index]->file_status_for_persistence; ret = consumer_rename_index_file(common,file_status_for_persistence); if(ret >= 0) { MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_FATAL, "[FIRST FULL UPDATE]", "Try 10 times,forced rename the index file,total config num:%d",common->total_config_num); } } else { MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_FATAL, "[FIRST FULL UPDATE]", "Try 10 times,there is not cotain index file."); } break; } MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_FATAL, "[FIRST FULL UPDATE]", "Not rename index file,there is register table that without data!,try %d time(total 10 times),sleep 30 seconds",retry); } if(ret < 0) { MESA_HANDLE_RUNTIME_LOG (common->main_logger, RLOG_LV_INFO, "[FIRST FULL UPDATE]", "business %s first full update failed,redis may be null ,please look at log/%s" ,common->local_state_for_manager->business_name,common->local_state_for_manager->business_name); // delete[] consumer->file_status_for_register; } return ret; } void * consumer_consume_work(void *_consumer) { consumer_t *consumer = (consumer_t *)_consumer; common_module_t *common = consumer->common; int ret = 0,i=0; if(common->relibility_switch == TENSOR_REL_ON) { tensor_redis_state_check(common); } ret = consumer_register_table(consumer); if(ret < 0) { goto ERR_OUT; } assert(ret>=0); //Check redis service status if(common->relibility_switch == TENSOR_REL_ON) { while(1) { sleep(common->maat_scan_interval*2); tensor_redis_state_check(common); if(common->maat_handle_state == TENSOR_MAAT_HANDLE_CHANGED) { ret = consumer_register_table(consumer); if(ret < 0) { goto ERR_OUT; } } } } return NULL; ERR_OUT: if(consumer != NULL) { if(consumer->file_status_for_register!= NULL) { for(i = 0;i < common->register_table_num ;i++) { if(consumer->file_status_for_register[i] != NULL) { delete consumer->file_status_for_register[i]->file_status_for_manager; delete consumer->file_status_for_register[i]->file_status_for_persistence; delete consumer->file_status_for_register[i]; } } delete[] consumer->file_status_for_register; } } return NULL; }