#include "maat_redis.h" int maat_redis_init(common_module_t *_common) { //for maat config file int ret = 0; char redis_ip[MAX_PATH_LENGTH]={0}; char maat_redis_info[MAX_PATH_LENGTH]=""; char stat_file[MAX_PATH_LENGTH] = ""; char config_file[MAX_PATH_LENGTH] = ""; unsigned int redis_port=6379, start_port= 6379,port_num=0,redis_db=0; int g_iThreadNum =4; // int scan_interval_ms=10; int effective_interval_ms=10; int scan_detail=0; ret = snprintf(config_file,MAX_PATH_LENGTH,"%s/%s",_common->config_path,COMMON_CONFIG_NAME); //add reliability fucation by liujunpeng 20180515 if(_common->maat_init_type == TENSOR_MAAT_INTI_TYPE_START) { ret = MESA_load_profile_string_def (config_file, "MAAT", "redis_ip", redis_ip, MAX_PATH_LENGTH, "127.0.0.1"); memcpy(_common->redis_ip,redis_ip,strlen(redis_ip)); } else memcpy(redis_ip,_common->redis_ip,strlen(_common->redis_ip)); ret = MESA_load_profile_uint_def (config_file, "MAAT", "start_port", &start_port,6379); ret = MESA_load_profile_uint_def (config_file, "MAAT", "port_num", &port_num,1); ret = MESA_load_profile_int_def (config_file, "MAAT", "g_iThreadNum", &g_iThreadNum,4); ret = MESA_load_profile_int_def (config_file, "MAAT", "scan_interval_ms", &(_common->maat_scan_interval),10); ret = MESA_load_profile_int_def (config_file, "MAAT", "effective_interval_ms", &effective_interval_ms,10); ret = MESA_load_profile_int_def (config_file, "MAAT", "scan_detail", &scan_detail,0); ret = MESA_load_profile_uint_def (config_file, "MAAT", "redis_db", &redis_db,0); //random for redis instance srand((unsigned)time(NULL)); redis_port = rand()%port_num + start_port; ret = snprintf(maat_redis_info,MAX_PATH_LENGTH,"%s/%s",_common->config_path,COMMON_REDIS_NAME); ret = snprintf(stat_file,MAX_PATH_LENGTH,"%s/%s",_common->log_path,"maat_scan.log"); assert(ret>=0); //undo: connect multiple database //maat feature init _common->feather=Maat_feather(g_iThreadNum, maat_redis_info, _common->maat_logger); Maat_set_feather_opt(_common->feather,MAAT_OPT_INSTANCE_NAME,"demo", strlen("demo")+1); Maat_set_feather_opt(_common->feather, MAAT_OPT_REDIS_IP, redis_ip, strlen(redis_ip)+1); Maat_set_feather_opt(_common->feather, MAAT_OPT_REDIS_PORT, &redis_port, sizeof(redis_port)); Maat_set_feather_opt(_common->feather, MAAT_OPT_REDIS_INDEX, &redis_db, sizeof(redis_db)); Maat_set_feather_opt(_common->feather, MAAT_OPT_SCANDIR_INTERVAL_MS,&(_common->maat_scan_interval),sizeof(_common->maat_scan_interval)); Maat_set_feather_opt(_common->feather, MAAT_OPT_STAT_FILE_PATH, stat_file, strlen(stat_file)+1); Maat_set_feather_opt(_common->feather, MAAT_OPT_EFFECT_INVERVAL_MS,&effective_interval_ms, sizeof(effective_interval_ms)); Maat_set_feather_opt(_common->feather, MAAT_OPT_STAT_ON, NULL, 0); Maat_set_feather_opt(_common->feather, MAAT_OPT_PERF_ON, NULL, 0); //close maat version cumulative update Maat_set_feather_opt(_common->feather, MAAT_OPT_CUMULATIVE_UPDATE_OFF, NULL, 0); Maat_set_feather_opt(_common->feather, MAAT_OPT_SCAN_DETAIL, &scan_detail, sizeof(scan_detail)); Maat_initiate_feather(_common->feather); assert(_common->feather); if(_common->feather==NULL) { MESA_HANDLE_RUNTIME_LOG(_common->error_logger,RLOG_LV_FATAL,"[COMMON]","maat redis feather init failed!"); if(_common->debug_switch == DEBUG_SWITCH_ON) printf("Maat initial error, \n"); return -1; } if((_common->run_mode == PRODUCER_MODE) && (_common->maat_init_type == TENSOR_MAAT_INTI_TYPE_START)) Maat_cmd_flushDB(_common->feather); return ret; } int variable_for_maat_cmd_t::maat_cmd_lines_update(void *line_info,char *line) { //for maat redis /* int valid_msg_index = variable_lines_for_maat_cmd->valid_msg_index; int invalid_msg_index = variable_lines_for_maat_cmd->invalid_msg_index; struct Maat_line_t **p_valid_msg_lines =variable_lines_for_maat_cmd->p_valid_msg_lines ; struct Maat_line_t **p_invalid_msg_lines =variable_lines_for_maat_cmd->p_invalid_msg_lines; struct Maat_line_t *valid_msg_lines = variable_lines_for_maat_cmd->valid_msg_lines; struct Maat_line_t *invalid_msg_lines = variable_lines_for_maat_cmd->invalid_msg_lines; */ int64_t local_rule_id = -1;int local_is_valid = -1; char local_file_name[MAX_TABLE_NAME_LEN] ={0}; determinant_line_info_t *determinant_line_info; indeterminant_line_info_t *indeterminant_line_info ; set::iterator table; switch(common->pz_type) { case PZ_TYPE_DETERMINANT: determinant_line_info = (determinant_line_info_t *)line_info; local_rule_id = determinant_line_info ->msg_id; strncpy(local_file_name,const_cast(determinant_line_info ->file_name),strlen(determinant_line_info ->file_name)+1); local_is_valid = determinant_line_info ->is_valid; break; case PZ_TYPE_NOT_DETERMINANT: indeterminant_line_info = (indeterminant_line_info_t *)line_info; local_rule_id = indeterminant_line_info->msg_id; //important: table name in redis table = common->registered_table_info.begin(); strcpy(local_file_name,(*table).c_str()); local_is_valid = indeterminant_line_info ->is_valid; break; default: if(common->debug_switch == DEBUG_SWITCH_ON) printf("input column valid error"); MESA_HANDLE_RUNTIME_LOG(common->runtime_logger,RLOG_LV_FATAL,"[maat_cmd_lines_update]","input column valid error."); break; } if(local_is_valid == 1) { (valid_msg_lines[valid_msg_index]).label_id=0; (valid_msg_lines[valid_msg_index]).rule_id=local_rule_id; //malloc memory,then destroy after valid_msg_lines[valid_msg_index].table_name = new char[MAX_TABLE_NAME_LEN]; valid_msg_lines[valid_msg_index].table_line =new char[MAX_ONE_CONFIG_MSG_LINE_LEN]; strncpy(const_cast(valid_msg_lines[valid_msg_index].table_name),const_cast(local_file_name),strlen(local_file_name)+1); strncpy(const_cast(valid_msg_lines[valid_msg_index].table_line),const_cast(line),strlen(line)+1); //snprintf(valid_msg_lines[*valid_msg_index].table_name,MAX_TABLE_NAME_LEN,"%s",determinant_line_info->table_name); //snprintf(valid_msg_lines[*valid_msg_index].table_line,MAX_ONE_CONFIG_MSG_LINE_LEN,"%s",line); //strncpy(valid_msg_lines[*valid_msg_index].table_line,line,strlen(line)+1); valid_msg_lines[valid_msg_index].expire_after=0; p_valid_msg_lines[valid_msg_index]=valid_msg_lines+valid_msg_index; (valid_msg_index)++; } else if(local_is_valid == 0) { invalid_msg_lines[invalid_msg_index].label_id=0; invalid_msg_lines[invalid_msg_index].rule_id=local_rule_id; //malloc memory,then destroy after invalid_msg_lines[invalid_msg_index].table_name = new char[MAX_TABLE_NAME_LEN]; invalid_msg_lines[invalid_msg_index].table_line =new char[MAX_ONE_CONFIG_MSG_LINE_LEN]; //snprintf(invalid_msg_lines[*invalid_msg_index].table_name,MAX_TABLE_NAME_LEN,"%s",determinant_line_info->table_name); //snprintf(invalid_msg_lines[*invalid_msg_index].table_line,MAX_ONE_CONFIG_MSG_LINE_LEN,"%s",line); strncpy(const_cast(invalid_msg_lines[invalid_msg_index].table_name),const_cast(local_file_name),strlen(local_file_name)+1); strncpy(const_cast(invalid_msg_lines[invalid_msg_index].table_line),const_cast(line),strlen(line)+1); invalid_msg_lines[invalid_msg_index].expire_after=0; p_invalid_msg_lines[invalid_msg_index]=invalid_msg_lines+invalid_msg_index; (invalid_msg_index)++; } else { return -1; } //end for maat redis return 0; } int variable_for_maat_cmd_t::generate_maat_cmd_write_redis() { int ret=0; if((valid_msg_index != valid_msg_num ) || (invalid_msg_index != invalid_msg_num)) { MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_FATAL, "[WRITE_REDIS]" , "file content deal error,valid_msg_index:%d != valid_msg_num::%d...invalid_msg_index:%d != invalid_msg_num::%d... " ,valid_msg_index ,valid_msg_num ,invalid_msg_index ,invalid_msg_num); return -1; } MESA_HANDLE_RUNTIME_LOG(common->perf_logger,RLOG_LV_INFO, "[WRITE_REDIS]", "start to write %d msgs into redis " ,valid_msg_index+invalid_msg_index); if(valid_msg_index > 0) ret=Maat_cmd_set_lines(common->feather, const_cast(p_valid_msg_lines),valid_msg_index, MAAT_OP_ADD); if(invalid_msg_index > 0) ret=Maat_cmd_set_lines(common->feather, const_cast(p_invalid_msg_lines),invalid_msg_index, MAAT_OP_DEL); if(common->debug_switch == 1) assert(ret>=0); if(common->relibility_switch == TENSOR_REL_ON) { tensor_redis_state_check(common); } if(ret <= 0) { MESA_HANDLE_RUNTIME_LOG(common->error_logger,RLOG_LV_INFO, "[WRITE_REDIS]", " Maat_cmd_set_lines error,ret:%d ,please look at maat_stat.log! ",ret); return ret; } MESA_HANDLE_RUNTIME_LOG(common->perf_logger,RLOG_LV_INFO, "[WRITE_REDIS]", " finish writing %d msgs into redis,then msg start to synchronize by master-slave of redis " ,valid_msg_index+invalid_msg_index); return ret; }