1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
/*************************
* File Name :tensor.c
* Author:Liujunpeng
* Date:2018-05-15
* Add switch_debug config item by
**************************/
#include "reliability.h"
int tensor_redis_state_check_init(struct common_module_t *common)
{
int ret;
unsigned int data_port = 0;
unsigned int health_check_port = 0;
unsigned int health_check_interval = 0;
unsigned int override = 0;
char common_config_name[MAX_PATH_LENGTH] = "";
char topic[MAX_TOPIC_LEN] = "";
char group[MAX_GROUP_LEN] = "";
// char primary_addr[MAX_IP_LENGTH] = "";
// char secondary_addr[MAX_IP_LENGTH] = "";
struct WLB_consumer_t consumer;
memset(&consumer,0,sizeof(struct WLB_consumer_t));
ret = snprintf(common_config_name,MAX_PATH_LENGTH,"%s/%s",common->config_path,COMMON_CONFIG_NAME);
ret = MESA_load_profile_string_def(common_config_name, "WIRED_INFO", "APP_NAME", topic, MAX_TOPIC_LEN,"TENSOR");
ret = MESA_load_profile_string_def(common_config_name, "MAAT", "LEVEL", group, MAX_GROUP_LEN,"level_1");
// ret = MESA_load_profile_string_def(common_config_name, "MAAT", "WLB_CONS_OPT_PRIMARY_ADDR", primary_addr, MAX_IP_LENGTH,"127.0.0.1");
// ret = MESA_load_profile_string_def(common_config_name, "MAAT", "WLB_CONS_OPT_SECONDARY_ADDR", secondary_addr, MAX_IP_LENGTH,"127.0.0.1");
ret = MESA_load_profile_uint_def(common_config_name, "MAAT", "HEALTH_CHECK_INTERVAL", &health_check_interval, 10);
ret = MESA_load_profile_uint_def(common_config_name, "MAAT", "HEALTH_CHECK_PORT", &health_check_port, 12030);
ret = MESA_load_profile_uint_def(common_config_name, "MAAT", "DATA_PORT", &data_port, 12031);
common->wlb_handle = wiredLB_create(topic,group,WLB_PRODUCER);
// wiredLB_set_opt(WLB_CONS_OPT_PRIMARY_ADDR,primary_addr,strlen(primary_addr)+1);
// wiredLB_set_opt(WLB_CONS_OPT_SECONDARY_ADDR,secondary_addr,strlen(secondary_addr)+1);
wiredLB_set_opt(common->wlb_handle,WLB_OPT_HEALTH_CHECK_PORT,&health_check_port,sizeof(health_check_port));
wiredLB_set_opt(common->wlb_handle,WLB_OPT_HEALTH_CHECK_INTERVAL,&health_check_interval,sizeof(health_check_interval));
wiredLB_set_opt(common->wlb_handle,WLB_OPT_ENABLE_OVERRIDE,&override,sizeof(override));
ret=wiredLB_init(common->wlb_handle);
assert(ret >= 0 );
return ret;
}
int tensor_change_redis(map<string,string>::iterator it,struct common_module_t *common)
{
MESA_HANDLE_RUNTIME_LOG (common->runtime_logger, RLOG_LV_FATAL, "[RELIABILITY]",
"Redis is changed!Old redis IP:%s ,New redis IP:%s",
common->redis_ip,(*it).second.c_str());
memcpy(common->redis_ip,(*it).second.c_str(),(*it).second.size());
sprintf(common->redis_ip+(*it).second.size(),"%c",'\0');
Maat_burn_feather(common->feather);
common->maat_init_type = TENSOR_MAAT_INIT_TYPE_RELI;
common->maat_handle_state = TENSOR_MAAT_HANDLE_CHANGED;
maat_redis_init(common);
MESA_HANDLE_RUNTIME_LOG (common->error_logger, RLOG_LV_FATAL, "[RELIABILITY]",
"Redis is changed!New redis ip is:%s",
common->redis_ip);
return 0;
}
int tensor_redis_state_check(struct common_module_t *common)
{
int i,ret = 0;
struct WLB_consumer_t redis_ip_list[MAX_REDIS_IP_NUM];
set<string> redis_ip_set;
set<string>::iterator set_it;
map<string,string>::iterator it;
map<string,string> *redis_level_map = NULL;
char level_string[MAX_FZX_LEVEL_PATH_LENGTH+1] = "";
ret = wiredLB_list(common->wlb_handle,MAX_REDIS_IP_NUM,redis_ip_list);
redis_level_map = new map<string,string>;
for(i = 0; i < ret;i++)
{
redis_ip_set.insert(redis_ip_list[i].ip_addr);
//TODO:work balance is not implement
//record judian ip,the same judian only the insert first ip will be reserved in the map
string level = redis_ip_list[i].user_tag;
string ip_addr = redis_ip_list[i].ip_addr;
redis_level_map->insert(std::pair<string,string>(level,ip_addr));
//record fenzhongxin ip,the same fenzhongxin only the first insert ip will be reserved in the map
if(common->run_mode == CONSUMER_MODE)
{
memcpy(level_string,redis_ip_list[i].user_tag,MAX_FZX_LEVEL_PATH_LENGTH);
level = level_string;
redis_level_map->insert(std::pair<string,string>(level,ip_addr));
}
}
//Search by level from old ip==>judian==>fenzhongxin
//if((it = redis_ip_set.find(common->redis_ip)) && ( it != redis_ip_set.end()))
set_it = redis_ip_set.find(common->redis_ip);
if(set_it != redis_ip_set.end())
{
common->maat_handle_state = TENSOR_MAAT_HANDLE_NOT_CHANGE;
MESA_HANDLE_RUNTIME_LOG (common->runtime_logger, RLOG_LV_FATAL, "[RELIABILITY]",
"Redis is OK ip is:%s",common->redis_ip);
}
else
{
it = redis_level_map->find(common->level);
if(it != redis_level_map->end())
{
tensor_change_redis(it,common);
}
else
{
memcpy(level_string,common->level,MAX_FZX_LEVEL_PATH_LENGTH);
it = redis_level_map->find(level_string);
if(it != redis_level_map->end())
{
tensor_change_redis(it,common);
}
else
{
common->maat_handle_state = TENSOR_MAAT_HANDLE_NOT_CHANGE;
MESA_HANDLE_RUNTIME_LOG (common->error_logger, RLOG_LV_FATAL, "[RELIABILITY]",
"Redis is out of service!Old redis ip is:%s",
common->redis_ip);
}
}
}
delete redis_level_map;
return 0;
}
|