1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
|
#include <common.h>
#include <sys/socket.h>
#include <assert.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>
#include <sc_common.h>
#include <rte_malloc.h>
#include <ctrlmsg.h>
#include <ctrlmsg_define.h>
#include <sc_app.h>
#include <cJSON.h>
struct app_event_handler
{
TAILQ_ENTRY(app_event_handler) next;
app_event_handler_t cb;
void * arg;
};
/* 控制面监测线程 */
struct app_main
{
TAILQ_HEAD(app_list, app) app_list;
TAILQ_HEAD(app_event_handler_list, app_event_handler) event_cb[APP_EV_TYPE_MAX];
/* 保护APP_LIST,该链表可能被应用管理线程和状态监控线程同时访问 */
pthread_mutex_t app_list_lock;
};
/* 新连接处理函数
新连接到达,表明应用即将注册。在新连接处理程序中完成app存储空间的分配。
*/
static int __conn_new_handler(struct ctrlmsg_handler * ct_hand,
struct ctrlmsg_conn * ct_conn, struct ctrl_msg_header * msg, void * arg)
{
struct app * app_object = ZMALLOC(sizeof(struct app));
struct app_main * app_main = (struct app_main *)arg;
MR_VERIFY_MALLOC(app_object);
ct_conn->pme = app_object;
app_object->conn = ct_conn;
pthread_mutex_lock(&app_main->app_list_lock);
TAILQ_INSERT_TAIL(&app_main->app_list, app_object, next);
pthread_mutex_unlock(&app_main->app_list_lock);
MR_DEBUG("APP module: new ctrlmsg connection arrived. ");
return 0;
}
static int __conn_close_handler(struct ctrlmsg_handler * ct_hand,
struct ctrlmsg_conn * ct_conn, struct ctrl_msg_header * msg, void * arg)
{
struct app * app_object = (struct app *)ct_conn->pme;
struct app_main * app_main = (struct app_main *)arg;
assert(app_object != NULL);
assert(app_main != NULL);
/* 对于没有标记注册的链接,不用回调应用结束处理函数,释放即可 */
if (!app_object->registered) goto free_object;
/* 链接终止,应用结束。调用应用结束事件处理回调函数 */
struct app_event_handler * ev_handler_iter;
TAILQ_FOREACH(ev_handler_iter, &app_main->event_cb[APP_EV_TYPE_UNREGISTER], next)
{
ev_handler_iter->cb(app_main, app_object, arg);
}
/* 应用退出前的操作,删除状态监测文件 */
unlink(app_object->mntfile);
MR_INFO("Application %s unregisted.", app_object->symbol);
free_object:
pthread_mutex_lock(&app_main->app_list_lock);
TAILQ_REMOVE(&app_main->app_list, app_object, next);
pthread_mutex_unlock(&app_main->app_list_lock);
FREE(app_object);
return 0;
}
static int __app_reigster_msg_handler(struct ctrlmsg_handler * ct_hand,
struct ctrlmsg_conn * ct_conn, struct ctrl_msg_header * msg, void * arg)
{
struct app * app_object = (struct app *)ct_conn->pme;
struct app_main * app_main = (struct app_main *)arg;
assert(app_object != NULL);
assert(app_main != NULL);
struct ctrl_msg_app_reg_request * reg_msg = (struct ctrl_msg_app_reg_request *)msg;
struct ctrl_msg_app_reg_response rep_msg;
memset(&rep_msg, 0, sizeof(rep_msg));
/* 响应头部构建 */
ctrl_msg_header_construct(&rep_msg.msg_header, sizeof(rep_msg), CTRL_MSG_TYPE_RESPONSE,
CTRLMSG_TOPIC_APP_REGISTER);
/* 检查应用名称是否重复 */
struct app * app_iter = NULL;
pthread_mutex_lock(&app_main->app_list_lock);
TAILQ_FOREACH(app_iter, &app_main->app_list, next)
{
if (strncmp(app_iter->symbol, (const char *)(reg_msg->symbol),
sizeof(app_object->symbol)) == 0)
{
pthread_mutex_unlock(&app_main->app_list_lock);
goto err_dup_app;
}
}
strncpy(app_object->symbol, (const char *)(reg_msg->symbol), sizeof(app_object->symbol));
strncpy(app_object->mntfile, (const char *)(reg_msg->mntfile), sizeof(app_object->mntfile));
app_object->pid = reg_msg->pid;
app_object->registered = 1;
pthread_mutex_unlock(&app_main->app_list_lock);
/* 调用应用注册事件处理回调函数 */
struct app_event_handler * ev_handler_iter;
TAILQ_FOREACH(ev_handler_iter, &app_main->event_cb[APP_EV_TYPE_REGISTER], next)
{
ev_handler_iter->cb(app_main, app_object, arg);
}
/* 构建应答消息,成功注册 */
rep_msg.msg_err.errcode = RT_SUCCESS;
strncpy((char *)rep_msg.symbol, app_object->symbol, sizeof(rep_msg.symbol));
ctrlmsg_msg_send(ct_hand, ct_conn, &rep_msg.msg_header);
MR_INFO("Application %s registed.", app_object->symbol);
return RT_SUCCESS;
err_dup_app: /* 错误,重复的应用注册 */
rep_msg.msg_err.errcode = RT_ERR;
snprintf((char *)rep_msg.msg_err.strerr, sizeof(rep_msg.msg_err.strerr),
"App %s already registed. Failed.", (char *)reg_msg->symbol);
strncpy((char *)rep_msg.symbol, (const char *)reg_msg->symbol, sizeof(rep_msg.symbol));
MR_INFO("%s", (char *)rep_msg.msg_err.strerr);
ctrlmsg_msg_send(ct_hand, ct_conn, &rep_msg.msg_header);
return 0;
}
int app_main_init(struct sc_main * sc)
{
sc->app_main = ZMALLOC(sizeof(struct app_main));
MR_VERIFY_MALLOC(sc->app_main);
TAILQ_INIT(&sc->app_main->app_list);
for (int i = 0; i < RTE_DIM(sc->app_main->event_cb); i++)
{
TAILQ_INIT(&sc->app_main->event_cb[i]);
}
ctrlmsg_event_conn_new_register(sc->ctrlmsg_handler, __conn_new_handler, sc->app_main);
ctrlmsg_event_conn_close_register(sc->ctrlmsg_handler, __conn_close_handler, sc->app_main);
/* 应用注册消息处理函数,没有应用反注册消息处理函数,应用反注册由连接断开信号完成。*/
ctrlmsg_msg_reciver_register(sc->ctrlmsg_handler, CTRLMSG_TOPIC_APP_REGISTER,
CTRL_MSG_TYPE_REQUEST, __app_reigster_msg_handler, sc->app_main);
return RT_SUCCESS;
}
cJSON *app_monit_loop(struct sc_main * sc)
{
struct app_main * app_main = sc->app_main;
struct cJSON * j_app_array = cJSON_CreateArray();
pthread_mutex_lock(&app_main->app_list_lock);
struct app * app_iter = NULL;
TAILQ_FOREACH(app_iter, &app_main->app_list, next)
{
struct cJSON * j_app_object = cJSON_CreateObject();
cJSON_AddStringToObject(j_app_object, "symbol", app_iter->symbol);
cJSON_AddNumberToObject(j_app_object, "registed", app_iter->registered);
cJSON_AddNumberToObject(j_app_object, "pid", app_iter->pid);
cJSON_AddItemToArray(j_app_array, j_app_object);
}
pthread_mutex_unlock(&app_main->app_list_lock);
return j_app_array;
}
void app_event_handler_register(struct app_main * app_main, enum app_event_type event_type,
app_event_handler_t fn_handler, void * arg)
{
struct app_event_handler * handler = ZMALLOC(sizeof(struct app_event_handler));
MR_VERIFY_MALLOC(handler);
handler->arg = arg;
handler->cb = fn_handler;
TAILQ_INSERT_TAIL(&app_main->event_cb[event_type], handler, next);
return;
}
|