#include #include #include #include #include #include #include #include #include struct sc_main * g_sc_main; struct app_event_handler { TAILQ_ENTRY(app_event_handler) next; app_event_handler_t cb; void * arg; }; /* 控制面监测线程 */ struct app_main { TAILQ_HEAD(app_list, app) app_list; TAILQ_HEAD(app_event_handler_list, app_event_handler) event_cb[APP_EV_TYPE_MAX]; int epoll_fd; /* 保护APP_LIST,该链表可能被应用管理线程和状态监控线程同时访问 */ pthread_mutex_t app_list_lock; }; static int __deconstruct_app(struct app * app_object) { struct app_main * app_main = sc_main_get()->app_main; assert(app_object != NULL); assert(app_main != NULL); /* 对于没有标记注册的链接,不用回调应用结束处理函数,释放即可 */ if (!app_object->registered) goto free_object; /* 链接终止,应用结束。调用应用结束事件处理回调函数 */ struct app_event_handler * ev_handler_iter; TAILQ_FOREACH(ev_handler_iter, &app_main->event_cb[APP_EV_TYPE_UNREGISTER], next) { ev_handler_iter->cb(app_main, app_object, ev_handler_iter->arg); } /* 应用退出前的操作,删除状态监测文件 */ unlink(app_object->mntfile); MR_INFO("Application %s unregisted.", app_object->symbol); epoll_del_event(app_main->epoll_fd, app_object->fd, EPOLLIN); close(app_object->fd); free_object: pthread_mutex_lock(&app_main->app_list_lock); TAILQ_REMOVE(&app_main->app_list, app_object, next); pthread_mutex_unlock(&app_main->app_list_lock); FREE(app_object); app_object = NULL; return 0; } // monitor app survival void * apps_survival_monitor(void * arg) { pthread_detach(pthread_self()); mr_thread_setname(pthread_self(), "apps_survival"); int epoll_fd = epoll_create1(EPOLL_CLOEXEC); if (epoll_fd < 0) { MR_ERROR("Cannot create epoll fd: %s. ", strerror(errno)); return NULL; } struct app_main * app_main = sc_main_get()->app_main; app_main->epoll_fd = epoll_fd; struct epoll_event evlist[__EV_MAX_EVENTS]; while (1) { int ret = epoll_wait(epoll_fd, evlist, __EV_MAX_EVENTS, -1); if (ret == -1 && errno == EINTR) continue; else if (ret == -1) { MR_ERROR("loop thread failed : %s", strerror(errno)); return NULL; } for (int i = 0; i < ret; i++) { int fd = evlist[i].data.fd; struct app * app_iter = app_lookup_by_fd(sc_main_get(), fd); /* 异常链接 */ if (evlist[i].events & (EPOLLHUP | EPOLLERR | EPOLLIN)) { __deconstruct_app(app_iter); } } } } int __send_app_register_response(const struct rte_mp_msg * msg, const void * peer, const char * symbol, bool is_success, const char * strerr) { struct app_register_resp * rep_content = ZMALLOC(sizeof(struct app_register_resp)); snprintf((char *)rep_content->symbol, sizeof(rep_content->symbol), "%s", symbol); if (is_success) { rep_content->errcode = RT_SUCCESS; } else { rep_content->errcode = RT_ERR; snprintf((char *)rep_content->strerr, sizeof(rep_content->strerr), "App %s registration failed:%s", symbol, strerr); } struct rte_mp_msg rep_msg = {}; snprintf(rep_msg.name, sizeof(rep_msg.name), "%s", msg->name); uintptr_t ptr_address = (uintptr_t)rep_content; rep_msg.len_param = sizeof(uintptr_t); memcpy(rep_msg.param, &ptr_address, sizeof(ptr_address)); int ret = rte_mp_reply(&rep_msg, peer); if (ret < 0) { MR_WARNING("Failed to execute rte_mp_reply:%s", rte_strerror(rte_errno)); return -1; } return 0; } int __instance_alive_handler(const struct rte_mp_msg * msg, const void * peer) { char strerr[MR_STRING_MAX] = {0}; uintptr_t stored_ptr_address; memcpy(&stored_ptr_address, msg->param, sizeof(uintptr_t)); struct app_register_req * reg_msg = (struct app_register_req *)stored_ptr_address; struct app_main * app_main = sc_main_get()->app_main; // Check if app names are duplicates struct app * app_iter = NULL; pthread_mutex_lock(&app_main->app_list_lock); TAILQ_FOREACH(app_iter, &app_main->app_list, next) { if (strncmp(app_iter->symbol, (const char *)(reg_msg->symbol), strlen(app_iter->symbol)) == 0) { pthread_mutex_unlock(&app_main->app_list_lock); close(msg->fds[0]); snprintf(strerr, sizeof(strerr), "%s has registed.", app_iter->symbol); MR_WARNING(strerr); goto error; } } // construct app struct app * app_object = ZMALLOC(sizeof(struct app)); snprintf(app_object->symbol, sizeof(app_object->symbol), "%s", (const char *)(reg_msg->symbol)); snprintf(app_object->mntfile, sizeof(app_object->mntfile), "%s", (const char *)(reg_msg->mntfile)); app_object->pid = reg_msg->pid; app_object->fd = msg->fds[0]; // Call the application registration event processing callback function struct app_event_handler * ev_handler_iter; TAILQ_FOREACH(ev_handler_iter, &app_main->event_cb[APP_EV_TYPE_REGISTER], next) { ev_handler_iter->cb(app_main, app_object, ev_handler_iter->arg); } app_object->registered = 1; TAILQ_INSERT_TAIL(&app_main->app_list, app_object, next); pthread_mutex_unlock(&app_main->app_list_lock); epoll_add_event(app_main->epoll_fd, app_object->fd, EPOLLIN); __send_app_register_response(msg, peer, (const char *)(reg_msg->symbol), true, NULL); MR_INFO("Application %s registe successfully.", app_object->symbol); return 0; error: __send_app_register_response(msg, peer, (const char *)(reg_msg->symbol), false, strerr); return 0; } int app_main_init(struct sc_main * sc) { sc->app_main = ZMALLOC(sizeof(struct app_main)); MR_VERIFY_MALLOC(sc->app_main); TAILQ_INIT(&sc->app_main->app_list); for (int i = 0; i < RTE_DIM(sc->app_main->event_cb); i++) { TAILQ_INIT(&sc->app_main->event_cb[i]); } pthread_t __pid = 0; int ret = pthread_create(&__pid, NULL, apps_survival_monitor, NULL); if (ret != 0) { MR_ERROR("Launch monitor_marsio_survival thread failed : %s", strerror(ret)); return RT_ERR; } rte_mp_action_register("instance_alive_register", __instance_alive_handler); return RT_SUCCESS; } struct app * app_lookup_by_symbol(struct sc_main * sc, const char * appsym) { struct app_main * app_main = sc->app_main; struct app * app_iter = NULL; pthread_mutex_lock(&app_main->app_list_lock); TAILQ_FOREACH(app_iter, &app_main->app_list, next) { if (strncmp(app_iter->symbol, appsym, sizeof(app_iter->symbol)) == 0) break; } pthread_mutex_unlock(&app_main->app_list_lock); return app_iter; } struct app * app_lookup_by_fd(struct sc_main * sc, int fd) { struct app_main * app_main = sc->app_main; struct app * app_iter = NULL; pthread_mutex_lock(&app_main->app_list_lock); TAILQ_FOREACH(app_iter, &app_main->app_list, next) { if (app_iter->fd == fd) break; } pthread_mutex_unlock(&app_main->app_list_lock); return app_iter; } cJSON * app_monit_loop(struct sc_main * sc) { struct app_main * app_main = sc->app_main; struct cJSON * j_app_array = cJSON_CreateArray(); pthread_mutex_lock(&app_main->app_list_lock); struct app * app_iter = NULL; TAILQ_FOREACH(app_iter, &app_main->app_list, next) { struct cJSON * j_app_object = cJSON_CreateObject(); cJSON_AddStringToObject(j_app_object, "symbol", app_iter->symbol); cJSON_AddNumberToObject(j_app_object, "registed", app_iter->registered); cJSON_AddNumberToObject(j_app_object, "pid", app_iter->pid); cJSON_AddItemToArray(j_app_array, j_app_object); } pthread_mutex_unlock(&app_main->app_list_lock); return j_app_array; } void app_event_handler_register(struct app_main * app_main, enum app_event_type event_type, app_event_handler_t fn_handler, void * arg) { struct app_event_handler * handler = ZMALLOC(sizeof(struct app_event_handler)); MR_VERIFY_MALLOC(handler); handler->arg = arg; handler->cb = fn_handler; TAILQ_INSERT_TAIL(&app_main->event_cb[event_type], handler, next); return; }