/************************* * File Name :tensor.c * Author:Liujunpeng * Date:2018-05-15 * Add switch_debug config item by **************************/ #include "reliability.h" int tensor_redis_state_check_init(struct common_module_t *common) { int ret; unsigned int data_port = 0; unsigned int health_check_port = 0; unsigned int health_check_interval = 0; unsigned int override = 0; char common_config_name[MAX_PATH_LENGTH] = ""; char topic[MAX_TOPIC_LEN] = ""; char group[MAX_GROUP_LEN] = ""; // char primary_addr[MAX_IP_LENGTH] = ""; // char secondary_addr[MAX_IP_LENGTH] = ""; struct WLB_consumer_t consumer; memset(&consumer,0,sizeof(struct WLB_consumer_t)); ret = snprintf(common_config_name,MAX_PATH_LENGTH,"%s/%s",common->config_path,COMMON_CONFIG_NAME); ret = MESA_load_profile_string_def(common_config_name, "WIRED_INFO", "APP_NAME", topic, MAX_TOPIC_LEN,"TENSOR"); ret = MESA_load_profile_string_def(common_config_name, "MAAT", "LEVEL", group, MAX_GROUP_LEN,"level_1"); // ret = MESA_load_profile_string_def(common_config_name, "MAAT", "WLB_CONS_OPT_PRIMARY_ADDR", primary_addr, MAX_IP_LENGTH,"127.0.0.1"); // ret = MESA_load_profile_string_def(common_config_name, "MAAT", "WLB_CONS_OPT_SECONDARY_ADDR", secondary_addr, MAX_IP_LENGTH,"127.0.0.1"); ret = MESA_load_profile_uint_def(common_config_name, "MAAT", "HEALTH_CHECK_INTERVAL", &health_check_interval, 10); ret = MESA_load_profile_uint_def(common_config_name, "MAAT", "HEALTH_CHECK_PORT", &health_check_port, 12030); ret = MESA_load_profile_uint_def(common_config_name, "MAAT", "DATA_PORT", &data_port, 12031); common->wlb_handle = wiredLB_create(topic,group,WLB_PRODUCER); // wiredLB_set_opt(WLB_CONS_OPT_PRIMARY_ADDR,primary_addr,strlen(primary_addr)+1); // wiredLB_set_opt(WLB_CONS_OPT_SECONDARY_ADDR,secondary_addr,strlen(secondary_addr)+1); wiredLB_set_opt(common->wlb_handle,WLB_OPT_HEALTH_CHECK_PORT,&health_check_port,sizeof(health_check_port)); wiredLB_set_opt(common->wlb_handle,WLB_OPT_HEALTH_CHECK_INTERVAL,&health_check_interval,sizeof(health_check_interval)); wiredLB_set_opt(common->wlb_handle,WLB_OPT_ENABLE_OVERRIDE,&override,sizeof(override)); ret=wiredLB_init(common->wlb_handle); assert(ret >= 0 ); return ret; } int tensor_change_redis(map::iterator it,struct common_module_t *common) { MESA_HANDLE_RUNTIME_LOG (common->runtime_logger, RLOG_LV_FATAL, "[RELIABILITY]", "Redis is changed!Old redis IP:%s ,New redis IP:%s", common->redis_ip,(*it).second.c_str()); memcpy(common->redis_ip,(*it).second.c_str(),(*it).second.size()); sprintf(common->redis_ip+(*it).second.size(),"%c",'\0'); Maat_burn_feather(common->feather); common->maat_init_type = TENSOR_MAAT_INIT_TYPE_RELI; common->maat_handle_state = TENSOR_MAAT_HANDLE_CHANGED; maat_redis_init(common); MESA_HANDLE_RUNTIME_LOG (common->error_logger, RLOG_LV_FATAL, "[RELIABILITY]", "Redis is changed!New redis ip is:%s", common->redis_ip); return 0; } int tensor_redis_state_check(struct common_module_t *common) { int i,ret = 0; struct WLB_consumer_t redis_ip_list[MAX_REDIS_IP_NUM]; set redis_ip_set; set::iterator set_it; map::iterator it; map *redis_level_map = NULL; char level_string[MAX_FZX_LEVEL_PATH_LENGTH+1] = ""; ret = wiredLB_list(common->wlb_handle,MAX_REDIS_IP_NUM,redis_ip_list); redis_level_map = new map; for(i = 0; i < ret;i++) { redis_ip_set.insert(redis_ip_list[i].ip_addr); //TODO:work balance is not implement //record judian ip,the same judian only the insert first ip will be reserved in the map string level = redis_ip_list[i].user_tag; string ip_addr = redis_ip_list[i].ip_addr; redis_level_map->insert(std::pair(level,ip_addr)); //record fenzhongxin ip,the same fenzhongxin only the first insert ip will be reserved in the map if(common->run_mode == CONSUMER_MODE) { memcpy(level_string,redis_ip_list[i].user_tag,MAX_FZX_LEVEL_PATH_LENGTH); level = level_string; redis_level_map->insert(std::pair(level,ip_addr)); } } //Search by level from old ip==>judian==>fenzhongxin //if((it = redis_ip_set.find(common->redis_ip)) && ( it != redis_ip_set.end())) set_it = redis_ip_set.find(common->redis_ip); if(set_it != redis_ip_set.end()) { common->maat_handle_state = TENSOR_MAAT_HANDLE_NOT_CHANGE; MESA_HANDLE_RUNTIME_LOG (common->runtime_logger, RLOG_LV_FATAL, "[RELIABILITY]", "Redis is OK ip is:%s",common->redis_ip); } else { it = redis_level_map->find(common->level); if(it != redis_level_map->end()) { tensor_change_redis(it,common); } else { memcpy(level_string,common->level,MAX_FZX_LEVEL_PATH_LENGTH); it = redis_level_map->find(level_string); if(it != redis_level_map->end()) { tensor_change_redis(it,common); } else { common->maat_handle_state = TENSOR_MAAT_HANDLE_NOT_CHANGE; MESA_HANDLE_RUNTIME_LOG (common->error_logger, RLOG_LV_FATAL, "[RELIABILITY]", "Redis is out of service!Old redis ip is:%s", common->redis_ip); } } } delete redis_level_map; return 0; }