/************************* * File Name :tensor.c * Author:Li Bi * Date:2018-01-08 * * Date:2018-03-06 * Add switch_debug config item by liujunpeng **************************/ #include "tensor.h" const char * TENSOR_K18_version_VERSION_20180921 = "TENSOR_K18_version_VERSION_20180921"; int common_module_init(common_module_t **common_module,int run_mode,char *config_path,char *log_path,char *project_name,void *main_logger) { int ret = 0; char common_config_name[MAX_PATH_LENGTH]; common_module_t *_common = new common_module_t(); _common->run_mode = run_mode; ret = common_init(config_path,log_path,project_name,main_logger,common_config_name,_common); if( ret < 0) { ret= -1; MESA_HANDLE_RUNTIME_LOG(_common->error_logger ,RLOG_LV_FATAL,"[COMMON]","common init failed!"); goto ERR_OUT; } ret = maat_redis_init(_common); if(ret < 0) { ret= -1; MESA_HANDLE_RUNTIME_LOG(_common->error_logger ,RLOG_LV_FATAL,"[COMMON]","maat_redis_init init failed!"); goto ERR_OUT; } if(_common->relibility_switch == TENSOR_REL_ON) { ret = tensor_redis_state_check_init(_common); if(ret <0) { ret= -1; MESA_HANDLE_RUNTIME_LOG(_common->error_logger ,RLOG_LV_FATAL,"[COMMON]","redis_state_check init failed!"); goto ERR_OUT; } } if(_common->pz_type == PZ_TYPE_NOT_DETERMINANT) { //fdfs:config_path/fdfs_client.conf char config_path_fdfs[MAX_PATH_LENGTH]; strcpy(config_path_fdfs, config_path); strcat(config_path_fdfs, "/fdfs_client.conf"); if(fastdfs_init(config_path_fdfs,_common) < 0) { ret= -1; MESA_HANDLE_RUNTIME_LOG(_common->error_logger ,RLOG_LV_FATAL,"[COMMON]","fastdfs init failed!"); goto ERR_OUT; } } *common_module = _common; //deal with error ERR_OUT: assert(ret >= 0); return ret; } int my_business_producer_thread_run(business_t *business_handle) { pthread_t my_producer_worker; pthread_attr_t worker_thread_attr; pthread_attr_init(&worker_thread_attr); pthread_attr_setdetachstate(&worker_thread_attr,PTHREAD_CREATE_DETACHED); pthread_create(&my_producer_worker, &worker_thread_attr,my_business_producer_produce_work, (void*)business_handle); return 0; } int my_business_consumer_thread_run(business_t *business_handle) { pthread_t my_consumer_worker; pthread_attr_t worker_thread_attr; pthread_attr_init(&worker_thread_attr); pthread_attr_setdetachstate(&worker_thread_attr,PTHREAD_CREATE_DETACHED); pthread_create(&my_consumer_worker, &worker_thread_attr,my_business_consumer_consume_work, (void*)business_handle); return 0; } int get_common_profile_from_consul(char *config_path) { int ret = 0; void *handle = NULL; char config_file[MAX_PATH_LENGTH]={0}; snprintf(config_file,MAX_PATH_LENGTH,"%s/%s",config_path,COMMON_CONFIG_NAME); handle=wired_cfg_create("TENSOR",config_file); ret=wired_cfg_init(handle); wired_cfg_destroy(handle); return ret; } int read_profile_from_tensor(vector &all_business,void ** logger,int *debug_switch,int *run_mode) { int ret=0; int log_lv = 0; unsigned int business_num = 1; char log_path[MAX_PATH_LENGTH] = ""; char all_business_name[MAX_ONE_CONTROL_MSG_LEN] = ""; //main config ret = MESA_load_profile_uint_def (TENSOR_CONFIG_FILE, "ALL_BUSINESS", "business_num", &business_num, 1); ret = MESA_load_profile_string_nodef (TENSOR_CONFIG_FILE, "ALL_BUSINESS", "all_business_name", all_business_name, MAX_ONE_CONTROL_MSG_LEN); ret = MESA_load_profile_string_def (TENSOR_CONFIG_FILE, "TENSOR", "log_path", log_path, MAX_PATH_LENGTH,"./log/tensor.log"); ret = MESA_load_profile_int_def (TENSOR_CONFIG_FILE, "TENSOR", "log_lv", &log_lv, 10); ret = MESA_load_profile_int_def (TENSOR_CONFIG_FILE, "TENSOR", "debug_switch", debug_switch, 0); ret = MESA_load_profile_int_def (TENSOR_CONFIG_FILE, "TENSOR", "run_mode", run_mode, 0); common::Func::split_string(all_business_name,',', all_business); //main log *logger = MESA_create_runtime_log_handle(log_path, log_lv); if(business_num != all_business.size()) ret = -1; return ret; } int main(int argc, char* argv[]) { int ret = 0; int debug_switch = 0; int run_mode = 0; void *logger=NULL; int business_num=1; vector all_business; char project_name_path[MAX_PATH_LENGTH] = ""; signal(SIGTTOU, SIG_IGN); ret = read_profile_from_tensor(all_business,&logger,&debug_switch,&run_mode); assert(ret >=0); if(ret <0) { MESA_HANDLE_RUNTIME_LOG(logger,RLOG_LV_FATAL,"[TENSOR]","tensor.conf init failed!"); return -1; } business_num =all_business.size(); business_t *business_handle = new business_t[business_num]; ret = snprintf(project_name_path,MAX_PATH_LENGTH,"%s",argv[0]); for(int business_index =0;business_index < business_num;business_index++) { //init business_t ./business/config ./business/log memset(business_handle+business_index,0, sizeof(business_t)); ret = snprintf(business_handle[business_index].my_business_name,MAX_LENGTH,"%s",all_business[business_index].c_str()); ret = snprintf(business_handle[business_index].my_business_config_path,MAX_PATH_LENGTH,"%s/%s",CONFIG_PATH,business_handle[business_index].my_business_name); ret = snprintf(business_handle[business_index].my_business_log_path,MAX_PATH_LENGTH,"%s/%s",LOG_PATH,business_handle[business_index].my_business_name); ret = snprintf(business_handle[business_index].my_project_name,MAX_PATH_LENGTH,"%s",project_name_path); business_handle[business_index].main_logger = logger; business_handle[business_index].debug_switch = debug_switch; if(access(business_handle[business_index].my_business_config_path, 0) == -1) { ret = -1; MESA_HANDLE_RUNTIME_LOG(logger,RLOG_LV_FATAL,"[TENSOR]","conf %s is not existent!",business_handle[business_index].my_business_config_path); } if(ret < 0) { MESA_HANDLE_RUNTIME_LOG(logger,RLOG_LV_FATAL,"[TENSOR]","business_handle init failed!"); delete[] business_handle; return -1; } assert(ret >=0); ret = get_common_profile_from_consul(business_handle[business_index].my_business_config_path); if(ret <0) { MESA_HANDLE_RUNTIME_LOG(logger,RLOG_LV_FATAL,"[TENSOR]","Get common.conf from consul failed!"); } assert(ret >=0); if(run_mode == 0) { if(business_handle[business_index].debug_switch == 1) { printf("**************************\n"); printf("%s producer start\n",business_handle[business_index].my_business_name); printf("**************************\n"); } ret = my_business_producer_thread_run(&(business_handle[business_index])); } else if(run_mode == 1) { if(business_handle[business_index].debug_switch == 1) { printf("**************************\n"); printf("%s consumer start\n",business_handle[business_index].my_business_name); printf("**************************\n"); } ret = my_business_consumer_thread_run(&(business_handle[business_index])); } else { MESA_HANDLE_RUNTIME_LOG(logger,RLOG_LV_FATAL,"[TENSOR]","run_mode error init failed!,exit!!!"); return -1; } } while(1) { sleep(100000); } return 0; } void *my_business_producer_produce_work(void * _business_handle) { int ret = 0; producer_t *producer=NULL; common_module_t *common =NULL; consumer_t *consumer_for_manager= NULL; business_t *business_handle = (business_t *)_business_handle; ret = common_module_init(&common,PRODUCER_MODE,business_handle->my_business_config_path,business_handle->my_business_log_path, business_handle->my_project_name,business_handle->main_logger); if (ret < 0) { //error occured , wirte log MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL,"[PRODUCER]","common init failed!"); MESA_HANDLE_RUNTIME_LOG(common->main_logger,RLOG_LV_FATAL,"[COMMON_INIT]","business:%s ,common init failed! ,please look at log/%s",business_handle->my_business_name,business_handle->my_business_name); ret=-1; goto ERR_OUT; } assert(ret>=0); MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_INFO,"[PRODUCER]","start init ---"); MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_INFO,"[PRODUCER]"," ---------"); if(business_handle->debug_switch == 1) { printf ("[PRODUCER] start init ---\n"); } FS_operate(common->stat_handle,common->fs_status_id[STATUS_TABLE_NUM],0,FS_OP_SET,common->register_table_num); if(common->zg_switch) { //modified by zzy manager_service_info manager_service; ret = manager_service.manager_init(common); ret = manager_service.daily_manager_service_thread_run(common); //modified by zzy } producer = new producer_t(common); ret = producer->producer_init(); if (ret != 0) { //error ,write log MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL,"[PRODUCER]","producer init failed!"); MESA_HANDLE_RUNTIME_LOG(common->main_logger,RLOG_LV_FATAL,"[PRODUCER_INIT]","business:%s ,producer init failed! ,please look at log/%s",business_handle->my_business_name,business_handle->my_business_name); ret=-1; goto ERR_OUT; } //init finished! MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_INFO,"[PRODUCER]","init success---------"); //run the msg producer to write msg into redis MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_FATAL,"[PRODUCER]","start run --------------"); if(business_handle->debug_switch == 1) { printf ("[PRODUCER] start run---\n"); } //producer main function producer->producer_thread_run(); if(common->zg_switch) { //after first full_index write into redis //wait 8 min sleep(60*8); //manager module consumer_for_manager = new consumer_t(common); ret = consumer_for_manager->consumer_init(); if (ret != 0) { //error ,write log MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL,"[CONSUMER]","consumer_for_manager init failed!"); MESA_HANDLE_RUNTIME_LOG(common->main_logger,RLOG_LV_FATAL,"[CONSUMER]","business:%s ,consumer_for_manager init failed! ,please look at log/%s",business_handle->my_business_name,business_handle->my_business_name); ret=-1; goto ERR_OUT; } //init finished! MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_INFO,"[CONSUMER_FOR_MANAGER]","init success---------"); //run the msg consumer MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_FATAL,"[CONSUMER_FOR_MANAGER]","start run --------------"); if(business_handle->debug_switch == 1) { printf ("[CONSUMER_FOR_MANAGER] start run ---\n"); } //consumer main function consumer_for_manager->consumer_thread_run(); } return NULL; ERR_OUT: Maat_burn_feather(common->feather); return NULL; } void *my_business_consumer_consume_work(void * _business_handle) { int ret = 0; consumer_t *consumer; common_module_t *common =NULL; business_t *business_handle = (business_t *)_business_handle; ret = common_module_init(&common,CONSUMER_MODE,business_handle->my_business_config_path,business_handle->my_business_log_path, business_handle->my_project_name,business_handle->main_logger); if (ret < 0) { //error occured , wirte log MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL,"[PRODUCER]","common init failed!"); MESA_HANDLE_RUNTIME_LOG(common->main_logger,RLOG_LV_FATAL,"[COMMON_INIT]","business:%s ,common init failed!,please look at log/%s",business_handle->my_business_name,business_handle->my_business_name); ret=-1; goto ERR_OUT; } if(business_handle->debug_switch == 1) { printf ("[CONSUMER] start init ---\n"); } MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_INFO,"[CONSUMER]","start init ---"); MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_INFO,"[CONSUMER]"," ---------"); if(common->zg_switch) { //modified by zzy manager_service_info manager_service; ret = manager_service.manager_init(common); //ret = manager_service.daily_manager_service_thread_run(common); //modified by zzy } //init finished! MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_INFO,"[CONSUMER]","init success---------"); //run the msg consumer MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_FATAL,"[CONSUMER]","start run --------------"); consumer = new consumer_t(common); ret = consumer->consumer_init(); if (ret != 0) { //error ,write log MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL,"[CONSUMER]","consumer init failed!"); MESA_HANDLE_RUNTIME_LOG(common->main_logger,RLOG_LV_FATAL,"[CONSUMER]","business:%s ,consumer init failed!,please look at log/%s",business_handle->my_business_name,business_handle->my_business_name); ret=-1; goto ERR_OUT; } //init finished! MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_INFO,"[CONSUMER]","init success---------"); //run the msg consumer MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_FATAL,"[CONSUMER]","start run --------------"); if(business_handle->debug_switch == 1) { printf ("[CONSUMER] start run ---\n"); } //consumer main function,register once ,then quit ,driven by maat redis consumer->consumer_thread_run(); return NULL; ERR_OUT: Maat_burn_feather(common->feather); return NULL; }