#include #include #include #include #include #include #include #include #include #include #include #include struct app_event_handler { TAILQ_ENTRY(app_event_handler) next; app_event_handler_t cb; void * arg; }; /* 控制面监测线程 */ struct app_main { TAILQ_HEAD(app_list, app) app_list; TAILQ_HEAD(app_event_handler_list, app_event_handler) event_cb[APP_EV_TYPE_MAX]; /* 保护APP_LIST,该链表可能被应用管理线程和状态监控线程同时访问 */ pthread_mutex_t app_list_lock; }; /* 新连接处理函数 新连接到达,表明应用即将注册。在新连接处理程序中完成app存储空间的分配。 */ static int __conn_new_handler(struct ctrlmsg_handler * ct_hand, struct ctrlmsg_conn * ct_conn, struct ctrl_msg_header * msg, void * arg) { struct app * app_object = ZMALLOC(sizeof(struct app)); struct app_main * app_main = (struct app_main *)arg; MR_VERIFY_MALLOC(app_object); ct_conn->pme = app_object; app_object->conn = ct_conn; pthread_mutex_lock(&app_main->app_list_lock); TAILQ_INSERT_TAIL(&app_main->app_list, app_object, next); pthread_mutex_unlock(&app_main->app_list_lock); MR_DEBUG("APP module: new ctrlmsg connection arrived. "); return 0; } static int __conn_close_handler(struct ctrlmsg_handler * ct_hand, struct ctrlmsg_conn * ct_conn, struct ctrl_msg_header * msg, void * arg) { struct app * app_object = (struct app *)ct_conn->pme; struct app_main * app_main = (struct app_main *)arg; assert(app_object != NULL); assert(app_main != NULL); /* 对于没有标记注册的链接,不用回调应用结束处理函数,释放即可 */ if (!app_object->registered) goto free_object; /* 链接终止,应用结束。调用应用结束事件处理回调函数 */ struct app_event_handler * ev_handler_iter; TAILQ_FOREACH(ev_handler_iter, &app_main->event_cb[APP_EV_TYPE_UNREGISTER], next) { ev_handler_iter->cb(app_main, app_object, arg); } /* 应用退出前的操作,删除状态监测文件 */ unlink(app_object->mntfile); MR_INFO("Application %s unregisted.", app_object->symbol); free_object: pthread_mutex_lock(&app_main->app_list_lock); TAILQ_REMOVE(&app_main->app_list, app_object, next); pthread_mutex_unlock(&app_main->app_list_lock); FREE(app_object); return 0; } static int __app_reigster_msg_handler(struct ctrlmsg_handler * ct_hand, struct ctrlmsg_conn * ct_conn, struct ctrl_msg_header * msg, void * arg) { struct app * app_object = (struct app *)ct_conn->pme; struct app_main * app_main = (struct app_main *)arg; assert(app_object != NULL); assert(app_main != NULL); struct ctrl_msg_app_reg_request * reg_msg = (struct ctrl_msg_app_reg_request *)msg; struct ctrl_msg_app_reg_response rep_msg; memset(&rep_msg, 0, sizeof(rep_msg)); /* 响应头部构建 */ ctrl_msg_header_construct(&rep_msg.msg_header, sizeof(rep_msg), CTRL_MSG_TYPE_RESPONSE, CTRLMSG_TOPIC_APP_REGISTER); /* 检查应用名称是否重复 */ struct app * app_iter = NULL; pthread_mutex_lock(&app_main->app_list_lock); TAILQ_FOREACH(app_iter, &app_main->app_list, next) { if (strncmp(app_iter->symbol, (const char *)(reg_msg->symbol), sizeof(app_object->symbol)) == 0) { pthread_mutex_unlock(&app_main->app_list_lock); goto err_dup_app; } } strncpy(app_object->symbol, (const char *)(reg_msg->symbol), sizeof(app_object->symbol)); strncpy(app_object->mntfile, (const char *)(reg_msg->mntfile), sizeof(app_object->mntfile)); app_object->pid = reg_msg->pid; app_object->registered = 1; pthread_mutex_unlock(&app_main->app_list_lock); /* 调用应用注册事件处理回调函数 */ struct app_event_handler * ev_handler_iter; TAILQ_FOREACH(ev_handler_iter, &app_main->event_cb[APP_EV_TYPE_REGISTER], next) { ev_handler_iter->cb(app_main, app_object, arg); } /* 构建应答消息,成功注册 */ rep_msg.msg_err.errcode = RT_SUCCESS; strncpy((char *)rep_msg.symbol, app_object->symbol, sizeof(rep_msg.symbol)); ctrlmsg_msg_send(ct_hand, ct_conn, &rep_msg.msg_header); MR_INFO("Application %s registed.", app_object->symbol); return RT_SUCCESS; err_dup_app: /* 错误,重复的应用注册 */ rep_msg.msg_err.errcode = RT_ERR; snprintf((char *)rep_msg.msg_err.strerr, sizeof(rep_msg.msg_err.strerr), "App %s already registed. Failed.", (char *)reg_msg->symbol); strncpy((char *)rep_msg.symbol, (const char *)reg_msg->symbol, sizeof(rep_msg.symbol)); MR_INFO("%s", (char *)rep_msg.msg_err.strerr); ctrlmsg_msg_send(ct_hand, ct_conn, &rep_msg.msg_header); return 0; } int app_main_init(struct sc_main * sc) { sc->app_main = ZMALLOC(sizeof(struct app_main)); MR_VERIFY_MALLOC(sc->app_main); TAILQ_INIT(&sc->app_main->app_list); for (int i = 0; i < RTE_DIM(sc->app_main->event_cb); i++) { TAILQ_INIT(&sc->app_main->event_cb[i]); } ctrlmsg_event_conn_new_register(sc->ctrlmsg_handler, __conn_new_handler, sc->app_main); ctrlmsg_event_conn_close_register(sc->ctrlmsg_handler, __conn_close_handler, sc->app_main); /* 应用注册消息处理函数,没有应用反注册消息处理函数,应用反注册由连接断开信号完成。*/ ctrlmsg_msg_reciver_register(sc->ctrlmsg_handler, CTRLMSG_TOPIC_APP_REGISTER, CTRL_MSG_TYPE_REQUEST, __app_reigster_msg_handler, sc->app_main); return RT_SUCCESS; } cJSON *app_monit_loop(struct sc_main * sc) { struct app_main * app_main = sc->app_main; struct cJSON * j_app_array = cJSON_CreateArray(); pthread_mutex_lock(&app_main->app_list_lock); struct app * app_iter = NULL; TAILQ_FOREACH(app_iter, &app_main->app_list, next) { struct cJSON * j_app_object = cJSON_CreateObject(); cJSON_AddStringToObject(j_app_object, "symbol", app_iter->symbol); cJSON_AddNumberToObject(j_app_object, "registed", app_iter->registered); cJSON_AddNumberToObject(j_app_object, "pid", app_iter->pid); cJSON_AddItemToArray(j_app_array, j_app_object); } pthread_mutex_unlock(&app_main->app_list_lock); return j_app_array; } void app_event_handler_register(struct app_main * app_main, enum app_event_type event_type, app_event_handler_t fn_handler, void * arg) { struct app_event_handler * handler = ZMALLOC(sizeof(struct app_event_handler)); MR_VERIFY_MALLOC(handler); handler->arg = arg; handler->cb = fn_handler; TAILQ_INSERT_TAIL(&app_main->event_cb[event_type], handler, next); return; }