diff options
| author | Qiuwen Lu <[email protected]> | 2017-10-20 14:30:24 +0800 |
|---|---|---|
| committer | Qiuwen Lu <[email protected]> | 2017-10-20 14:30:24 +0800 |
| commit | 9370fdec4c2f84070d85cf6c74a1fe58eb5978d1 (patch) | |
| tree | ee4dfabaedf08f61370eed9d7de9a8d0efb00749 | |
| parent | 8d3bd791d0992f87ae0cf7e114a1c356e3e7d45b (diff) | |
增加控制线程(控制指令通信)保活功能
- 增加控制线程(控制指令通信)保活功能,定期喂狗。当控制线程死锁时,systemd看门狗将进程杀死。
| -rw-r--r-- | infra/include/ctrlmsg.h | 6 | ||||
| -rw-r--r-- | infra/src/ctrlmsg.c | 656 | ||||
| -rw-r--r-- | service/include/sc_common.h | 5 | ||||
| -rw-r--r-- | service/src/core.c | 39 | ||||
| -rw-r--r-- | tools/systemd/mrzcpd.service.in | 1 |
5 files changed, 380 insertions, 327 deletions
diff --git a/infra/include/ctrlmsg.h b/infra/include/ctrlmsg.h index 68c2ecf..c33c7dc 100644 --- a/infra/include/ctrlmsg.h +++ b/infra/include/ctrlmsg.h @@ -99,6 +99,8 @@ struct ctrlmsg_handler int event_fd; /* 监听FD */ int listen_fd; + /* 保活间隔 */ + int timeout; /* 有效连接链表 */ struct ctrlmsg_conn_list conn_list; /* 回调函数列表 */ @@ -109,8 +111,8 @@ struct ctrlmsg_handler struct ctrlmsg_cb_list cb_conn_close_list; }; -struct ctrlmsg_handler * ctrlmsg_handler_create(enum ctrlmsg_handler_mode mode, - struct sockaddr_in sockaddr, void * arg); +struct ctrlmsg_handler * ctrlmsg_handler_create(enum ctrlmsg_handler_mode mode, struct sockaddr_in sockaddr, + void * arg, int timeout); int ctrlmsg_thread_launch(struct ctrlmsg_handler * handler); diff --git a/infra/src/ctrlmsg.c b/infra/src/ctrlmsg.c index 060ac1e..aedd4c1 100644 --- a/infra/src/ctrlmsg.c +++ b/infra/src/ctrlmsg.c @@ -19,13 +19,15 @@ #include <arpa/inet.h> #include <errno.h> +#include <systemd/sd-daemon.h> + #define __EV_MAX_EVENTS 16 #define __EV_MAX_BUFFER 2048 #define __CONN_MAX_BUFFER 8192 static void __cleanup_unlock_mutex(pthread_mutex_t ** lock) { - pthread_mutex_unlock(*lock); + pthread_mutex_unlock(*lock); } #define __MUTEX_LOCK(lock) \ @@ -35,52 +37,52 @@ static void __cleanup_unlock_mutex(pthread_mutex_t ** lock) static int __tcp_setnonblock(int sockfd) { - int flags = fcntl(sockfd, F_GETFL, 0); - if (flags == -1) return -1; - int ret = fcntl(sockfd, F_SETFL, flags | O_NONBLOCK, 0); - if (ret == -1) return -1; - return 0; + int flags = fcntl(sockfd, F_GETFL, 0); + if (flags == -1) return -1; + int ret = fcntl(sockfd, F_SETFL, flags | O_NONBLOCK, 0); + if (ret == -1) return -1; + return 0; } static int __update_epoll_event(int epoll_fd, int epoll_mode, int fd, int epoll_type) { - struct epoll_event event = { 0 }; - event.data.fd = fd; - event.events = epoll_type; + struct epoll_event event = { 0 }; + event.data.fd = fd; + event.events = epoll_type; - __tcp_setnonblock(fd); - if (-1 == epoll_ctl(epoll_fd, epoll_mode, fd, &event)) - { - MR_ERROR("update epoll event failed : %s\n", strerror(errno)); return -1; - } + __tcp_setnonblock(fd); + if (-1 == epoll_ctl(epoll_fd, epoll_mode, fd, &event)) + { + MR_ERROR("update epoll event failed : %s\n", strerror(errno)); return -1; + } - return 0; + return 0; } int __epoll_add_event(int epoll_fd, int sd, int epoll_type) { - return __update_epoll_event(epoll_fd, EPOLL_CTL_ADD, sd, epoll_type); + return __update_epoll_event(epoll_fd, EPOLL_CTL_ADD, sd, epoll_type); } int __epoll_del_event(int epoll_fd, int sd, int epoll_type) { - return __update_epoll_event(epoll_fd, EPOLL_CTL_DEL, sd, epoll_type); + return __update_epoll_event(epoll_fd, EPOLL_CTL_DEL, sd, epoll_type); } int __epoll_mod_event(int epoll_fd, int sd, int epoll_type) { - return __update_epoll_event(epoll_fd, EPOLL_CTL_MOD, sd, epoll_type); + return __update_epoll_event(epoll_fd, EPOLL_CTL_MOD, sd, epoll_type); } struct ctrlmsg_conn * __lookup_conn_by_fd(struct ctrlmsg_handler * handler, int fd) { - struct ctrlmsg_conn * conn_iter = NULL; - TAILQ_FOREACH(conn_iter, &handler->conn_list, next) - { - if (conn_iter->conn_fd == fd) return conn_iter; - } + struct ctrlmsg_conn * conn_iter = NULL; + TAILQ_FOREACH(conn_iter, &handler->conn_list, next) + { + if (conn_iter->conn_fd == fd) return conn_iter; + } - return NULL; + return NULL; } enum parse_status @@ -101,7 +103,7 @@ static int parse_line(struct ctrlmsg_handler * handler, struct ctrlmsg_conn * co unsigned int * m_checked_idx = &conn->rcv_buffer_checked; unsigned int * m_read_idx = &conn->rcv_buffer_used; char * m_read_buf = conn->rcv_buffer; - + for (; *m_checked_idx < *m_read_idx; (*m_checked_idx)++) { char temp = conn->rcv_buffer[conn->rcv_buffer_checked]; @@ -125,7 +127,7 @@ static int parse_line(struct ctrlmsg_handler * handler, struct ctrlmsg_conn * co { if ((*m_checked_idx > 1) && (m_read_buf[*m_checked_idx - 1] == '\r')) { - m_read_buf[(*m_checked_idx)-1] = '\0'; + m_read_buf[(*m_checked_idx) - 1] = '\0'; m_read_buf[(*m_checked_idx)++] = '\0'; return PARSE_OK; } @@ -142,10 +144,10 @@ static int parse_content_json(struct ctrlmsg_handler * handler, struct ctrlmsg_c return 0; } -static int parse_content_binary(struct ctrlmsg_handler * handler, struct ctrlmsg_conn * conn, - int epoll_fd, int fd) +static int parse_content_binary(struct ctrlmsg_handler * handler, struct ctrlmsg_conn * conn, + int epoll_fd, int fd) { - /* 缓冲区的数据还不够头部,退出循环 */ + /* 缓冲区的数据还不够头部,退出循环 */ while (conn->rcv_buffer_used >= sizeof(struct ctrl_msg_header)) { /* 缓冲区已经收到了头部,但数据长度不足报文声明的长度,数据不全 */ @@ -179,110 +181,110 @@ static int parse_content_binary(struct ctrlmsg_handler * handler, struct ctrlmsg } /* 处理新建的监听连接 */ -void __handle_new_conn(struct ctrlmsg_handler * handler, - int epoll_fd, int conn_fd, struct sockaddr_in remote_addr) +void __handle_new_conn(struct ctrlmsg_handler * handler, + int epoll_fd, int conn_fd, struct sockaddr_in remote_addr) { - /* 分配新的APP-CONN结构体,只保存链接信息,不创建应用信息 */ - struct ctrlmsg_conn * conn = LIBC_MALLOC(sizeof(struct ctrlmsg_conn)); - MR_VERIFY_MALLOC(conn); - - conn->conn_fd = conn_fd; - conn->peer_sockaddr_in = remote_addr; - conn->rcv_buffer = LIBC_MALLOC(__CONN_MAX_BUFFER); - conn->rcv_buffer_max = __CONN_MAX_BUFFER; - conn->rcv_buffer_used = 0; - conn->snd_buffer = LIBC_MALLOC(__CONN_MAX_BUFFER); - conn->snd_buffer_max = __CONN_MAX_BUFFER; - conn->snd_buffer_used = 0; + /* 分配新的APP-CONN结构体,只保存链接信息,不创建应用信息 */ + struct ctrlmsg_conn * conn = LIBC_MALLOC(sizeof(struct ctrlmsg_conn)); + MR_VERIFY_MALLOC(conn); + + conn->conn_fd = conn_fd; + conn->peer_sockaddr_in = remote_addr; + conn->rcv_buffer = LIBC_MALLOC(__CONN_MAX_BUFFER); + conn->rcv_buffer_max = __CONN_MAX_BUFFER; + conn->rcv_buffer_used = 0; + conn->snd_buffer = LIBC_MALLOC(__CONN_MAX_BUFFER); + conn->snd_buffer_max = __CONN_MAX_BUFFER; + conn->snd_buffer_used = 0; conn->snd_buffer_deal = 0; - MR_VERIFY_MALLOC(conn->rcv_buffer); - MR_VERIFY_MALLOC(conn->snd_buffer); + MR_VERIFY_MALLOC(conn->rcv_buffer); + MR_VERIFY_MALLOC(conn->snd_buffer); /* 初始化锁,使用递归锁 */ pthread_mutexattr_t mutex_attr; pthread_mutexattr_init(&mutex_attr); pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE); pthread_mutex_init(&conn->mutex, &mutex_attr); - - /* 加入链接表中 */ - TAILQ_INSERT_TAIL(&handler->conn_list, conn, next); - - /* 调用新链接回调函数 */ - struct ctrlmsg_cb * cb_object; - TAILQ_FOREACH(cb_object, &handler->cb_conn_new_list, next) - { - cb_object->fn_ctrlmsg_cb(handler, conn, NULL, cb_object->arg); - } - - __epoll_add_event(epoll_fd, conn_fd, EPOLLIN); - return; + + /* 加入链接表中 */ + TAILQ_INSERT_TAIL(&handler->conn_list, conn, next); + + /* 调用新链接回调函数 */ + struct ctrlmsg_cb * cb_object; + TAILQ_FOREACH(cb_object, &handler->cb_conn_new_list, next) + { + cb_object->fn_ctrlmsg_cb(handler, conn, NULL, cb_object->arg); + } + + __epoll_add_event(epoll_fd, conn_fd, EPOLLIN); + return; } // 处理链接关闭信息 void __handle_close(struct ctrlmsg_handler * handler, struct ctrlmsg_conn * conn, - int epoll_fd, int fd) + int epoll_fd, int fd) { - /* 调用链接终止回调函数 */ - struct ctrlmsg_cb * cb_object; - TAILQ_FOREACH(cb_object, &handler->cb_conn_close_list, next) - { - cb_object->fn_ctrlmsg_cb(handler, conn, NULL, cb_object->arg); - } - - __epoll_del_event(epoll_fd, fd, EPOLLIN); - close(fd); - - TAILQ_REMOVE(&handler->conn_list, conn, next); - free(conn->rcv_buffer); - free(conn->snd_buffer); - free(conn); - return; + /* 调用链接终止回调函数 */ + struct ctrlmsg_cb * cb_object; + TAILQ_FOREACH(cb_object, &handler->cb_conn_close_list, next) + { + cb_object->fn_ctrlmsg_cb(handler, conn, NULL, cb_object->arg); + } + + __epoll_del_event(epoll_fd, fd, EPOLLIN); + close(fd); + + TAILQ_REMOVE(&handler->conn_list, conn, next); + free(conn->rcv_buffer); + free(conn->snd_buffer); + free(conn); + return; } /* 处理监听连接到达的数据 */ void __handle_read(struct ctrlmsg_handler * handler, struct ctrlmsg_conn * conn, - int epoll_fd, int fd) + int epoll_fd, int fd) { __MUTEX_LOCK(&conn->mutex); - for (;;) - { - int ret = recv(fd, RTE_PTR_ADD(conn->rcv_buffer, conn->rcv_buffer_used), - conn->rcv_buffer_max - conn->rcv_buffer_used, 0); - - if (ret < 0 && (errno == EWOULDBLOCK || errno == EAGAIN)) - { - break; - } - - if (ret < 0) - { - MR_ERROR("File descripter recv error: %s", strerror(errno)); - __handle_close(handler, conn, epoll_fd, fd); - return; - } - - if (ret == 0) - { - __handle_close(handler, conn, epoll_fd, fd); - return; - } - - conn->rcv_buffer_used += ret; - } - - parse_content_binary(handler, conn, epoll_fd, fd); - return; + for (;;) + { + int ret = recv(fd, RTE_PTR_ADD(conn->rcv_buffer, conn->rcv_buffer_used), + conn->rcv_buffer_max - conn->rcv_buffer_used, 0); + + if (ret < 0 && (errno == EWOULDBLOCK || errno == EAGAIN)) + { + break; + } + + if (ret < 0) + { + MR_ERROR("File descripter recv error: %s", strerror(errno)); + __handle_close(handler, conn, epoll_fd, fd); + return; + } + + if (ret == 0) + { + __handle_close(handler, conn, epoll_fd, fd); + return; + } + + conn->rcv_buffer_used += ret; + } + + parse_content_binary(handler, conn, epoll_fd, fd); + return; } void __handle_write(struct ctrlmsg_handler * handler, struct ctrlmsg_conn * conn, - int epoll_fd, int fd) + int epoll_fd, int fd) { - int ret = 0; + int ret = 0; __MUTEX_LOCK(&conn->mutex); - do { - ret = send(fd, conn->snd_buffer, conn->snd_buffer_used, MSG_NOSIGNAL); + do { + ret = send(fd, conn->snd_buffer, conn->snd_buffer_used, MSG_NOSIGNAL); } while ((ret == -1 && (errno == EINTR))); conn->snd_buffer_deal += ret; @@ -294,182 +296,200 @@ void __handle_write(struct ctrlmsg_handler * handler, struct ctrlmsg_conn * conn conn->snd_buffer_deal = 0; __epoll_mod_event(handler->epoll_fd, conn->conn_fd, EPOLLIN); } - - return; + + return; } +void __handle_timeout(struct ctrlmsg_handler * handler) +{ + /* 向systemd的watchdog喂狗 */ + /* TODO: 不应该在这里做这件事,应该在外面,通过回调函数调用 */ + + if (sd_watchdog_enabled(0, NULL)) + { + sd_notify(0, "WATCHDOG=1"); + } + + return; +} static void * __epoll_event_loop(void * arg) { - struct ctrlmsg_handler * handler = (struct ctrlmsg_handler *)arg; - struct epoll_event evlist[__EV_MAX_EVENTS]; - - int epoll_fd = handler->epoll_fd; - int listen_fd = handler->listen_fd; - - /* 对于客户端模式,没有监听的端口 */ - if (handler->mode == CTRLMSG_HANDLER_MODE_CLIENT) listen_fd = -1; - - while (1) - { - int ret = epoll_wait(epoll_fd, evlist, __EV_MAX_EVENTS, -1); - if (ret == -1 && errno == EINTR) continue; - else if (ret == -1) - { - MR_ERROR("Ctrlmsg event loop thread failed :waiting on epoll fd : %s", - strerror(errno)); goto errout; - } - - for (int i = 0; i < ret; i++) - { - int fd = evlist[i].data.fd; - - /* 新连接,仅对服务器模式有效 */ - if (fd == listen_fd && (evlist[i].events & EPOLLIN)) - { - struct sockaddr_in remote_addr; - socklen_t sz_remote_addr = sizeof(remote_addr); + struct ctrlmsg_handler * handler = (struct ctrlmsg_handler *)arg; + struct epoll_event evlist[__EV_MAX_EVENTS]; + + int epoll_fd = handler->epoll_fd; + int listen_fd = handler->listen_fd; + int timeout = handler->timeout; + + /* 对于客户端模式,没有监听的端口 */ + if (handler->mode == CTRLMSG_HANDLER_MODE_CLIENT) + listen_fd = -1; + + while (1) + { + int ret = epoll_wait(epoll_fd, evlist, __EV_MAX_EVENTS, timeout); + if (ret == -1 && errno == EINTR) continue; + else if (ret == -1) + { + MR_ERROR("Ctrlmsg event loop thread failed :waiting on epoll fd : %s", + strerror(errno)); goto errout; + } + + for (int i = 0; i < ret; i++) + { + int fd = evlist[i].data.fd; + + /* 新连接,仅对服务器模式有效 */ + if (fd == listen_fd && (evlist[i].events & EPOLLIN)) + { + struct sockaddr_in remote_addr; + socklen_t sz_remote_addr = sizeof(remote_addr); /* 使用accept4接口,在接收新链接时直接设置非阻塞选项、exec关闭选项 */ int conn_fd = accept4(listen_fd, (struct sockaddr *)&remote_addr, &sz_remote_addr, SOCK_CLOEXEC | SOCK_NONBLOCK); - if (conn_fd < 0) - { - MR_ERROR("Accept remote connnection failed : %s", strerror(errno)); - continue; - } - - __handle_new_conn(handler, epoll_fd, conn_fd, remote_addr); - continue; - } - - /* 已经存在的连接,先查找对应的conn */ - struct ctrlmsg_conn * conn = __lookup_conn_by_fd(handler, fd); - if (conn == NULL) - { - MR_ERROR("Cannot locate conn structure for fd %d", fd); - continue; - } - - /* 异常链接 */ - if (evlist[i].events & (EPOLLHUP | EPOLLERR)) - { - __handle_close(handler, conn, epoll_fd, fd); - continue; - } - - /* 数据到达 */ - if (evlist[i].events & EPOLLIN) - { - __handle_read(handler, conn, epoll_fd, fd); - continue; - } - - /* 描述符可写 */ - if (evlist[i].events & EPOLLOUT) - { - __handle_write(handler, conn, epoll_fd, fd); - continue; - } - } - } + if (conn_fd < 0) + { + MR_ERROR("Accept remote connnection failed : %s", strerror(errno)); + continue; + } + + __handle_new_conn(handler, epoll_fd, conn_fd, remote_addr); + continue; + } + + /* 已经存在的连接,先查找对应的conn */ + struct ctrlmsg_conn * conn = __lookup_conn_by_fd(handler, fd); + if (conn == NULL) + { + MR_ERROR("Cannot locate conn structure for fd %d", fd); + continue; + } + + /* 异常链接 */ + if (evlist[i].events & (EPOLLHUP | EPOLLERR)) + { + __handle_close(handler, conn, epoll_fd, fd); + continue; + } + + /* 数据到达 */ + if (evlist[i].events & EPOLLIN) + { + __handle_read(handler, conn, epoll_fd, fd); + continue; + } + + /* 描述符可写 */ + if (evlist[i].events & EPOLLOUT) + { + __handle_write(handler, conn, epoll_fd, fd); + continue; + } + } + + /* 超时事件,喂狗 */ + if (ret == 0 && timeout >= 0) + __handle_timeout(handler); + } errout: - MR_ERROR("Ctrlmsg event loop thread terminated. "); - return NULL; + MR_ERROR("Ctrlmsg event loop thread terminated. "); + return NULL; } static int __common_mode_create(struct ctrlmsg_handler * handle) { int epoll_fd = epoll_create1(EPOLL_CLOEXEC); - if (epoll_fd < 0) - { - MR_ERROR("Cannot create epoll fd: %s. ", strerror(errno)); - return RT_ERR; - } - - int event_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); - if(eventfd < 0) - { - MR_ERROR("Cannot create event fd: %s. ", strerror(errno)); - return RT_ERR; - } - - handle->epoll_fd = epoll_fd; - handle->event_fd = event_fd; - return RT_SUCCESS; + if (epoll_fd < 0) + { + MR_ERROR("Cannot create epoll fd: %s. ", strerror(errno)); + return RT_ERR; + } + + int event_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); + if (eventfd < 0) + { + MR_ERROR("Cannot create event fd: %s. ", strerror(errno)); + return RT_ERR; + } + + handle->epoll_fd = epoll_fd; + handle->event_fd = event_fd; + return RT_SUCCESS; } static int __server_mode_create(struct ctrlmsg_handler * handle, struct sockaddr_in sockaddr_in) { - int ret = __common_mode_create(handle); - if (ret < 0) return ret; - - // 监听本地端口,TCP连接 - int listen_fd = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, 0); - if (listen_fd < 0) - { - MR_ERROR("Create event listen fd failed : %s.", strerror(errno)); - goto out; - } - - int reuse = 1; - if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, (const void*)&reuse, sizeof(int)) < 0) - { - MR_ERROR("Set crash event listen fd reuse failed : %s", strerror(errno)); - goto out; - } - - ret = bind(listen_fd, (struct sockaddr *)&sockaddr_in, sizeof(struct sockaddr_in)); - if (ret < 0) - { - MR_ERROR("Bind ctrlmsg listen fd failed : %s.", strerror(errno)); - goto out; - } - - ret = listen(listen_fd, 20); - if (ret < 0) - { - MR_ERROR("Listen ctrlmsg listen fd failed : %s.", strerror(errno)); - goto out; - } - - __epoll_add_event(handle->epoll_fd, listen_fd, EPOLLIN); - - char str_addr[MR_STRING_MAX]; - inet_ntop(AF_INET, &(sockaddr_in.sin_addr), str_addr, sizeof(str_addr)); - + int ret = __common_mode_create(handle); + if (ret < 0) return ret; + + // 监听本地端口,TCP连接 + int listen_fd = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, 0); + if (listen_fd < 0) + { + MR_ERROR("Create event listen fd failed : %s.", strerror(errno)); + goto out; + } + + int reuse = 1; + if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, (const void*)&reuse, sizeof(int)) < 0) + { + MR_ERROR("Set crash event listen fd reuse failed : %s", strerror(errno)); + goto out; + } + + ret = bind(listen_fd, (struct sockaddr *)&sockaddr_in, sizeof(struct sockaddr_in)); + if (ret < 0) + { + MR_ERROR("Bind ctrlmsg listen fd failed : %s.", strerror(errno)); + goto out; + } + + ret = listen(listen_fd, 20); + if (ret < 0) + { + MR_ERROR("Listen ctrlmsg listen fd failed : %s.", strerror(errno)); + goto out; + } + + __epoll_add_event(handle->epoll_fd, listen_fd, EPOLLIN); + + char str_addr[MR_STRING_MAX]; + inet_ntop(AF_INET, &(sockaddr_in.sin_addr), str_addr, sizeof(str_addr)); + MR_INFO("Ctrlplane message module: Server mode"); MR_INFO("Ctrlplane message server: %s:%d", str_addr, ntohs(sockaddr_in.sin_port)); - - // 处理公共描述符 - handle->listen_fd = listen_fd; - return RT_SUCCESS; + + // 处理公共描述符 + handle->listen_fd = listen_fd; + return RT_SUCCESS; out: - if (listen_fd > 0) close(listen_fd); - return RT_ERR; + if (listen_fd > 0) close(listen_fd); + return RT_ERR; } static int __client_mode_create(struct ctrlmsg_handler * handle, struct sockaddr_in sockaddr_in) { - int ret = __common_mode_create(handle); - if (ret < 0) return ret; - - int conn_fd = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, 0); - if (conn_fd < 0) - { - MR_ERROR("Cannot create socket for ctrlplane message : %s.", strerror(errno)); - return RT_ERR; - } - - ret = connect(conn_fd, (const struct sockaddr *)&sockaddr_in, sizeof(sockaddr_in)); - if (ret < 0) - { - MR_ERROR("Connect to ctrlplane message service failed : %s.", strerror(errno)); - return RT_ERR; - } + int ret = __common_mode_create(handle); + if (ret < 0) return ret; + + int conn_fd = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, 0); + if (conn_fd < 0) + { + MR_ERROR("Cannot create socket for ctrlplane message : %s.", strerror(errno)); + return RT_ERR; + } + + ret = connect(conn_fd, (const struct sockaddr *)&sockaddr_in, sizeof(sockaddr_in)); + if (ret < 0) + { + MR_ERROR("Connect to ctrlplane message service failed : %s.", strerror(errno)); + return RT_ERR; + } char str_server_addr[INET_ADDRSTRLEN] = { 0 }; inet_ntop(AF_INET, &(sockaddr_in.sin_addr), str_server_addr, INET_ADDRSTRLEN); @@ -490,43 +510,45 @@ static int __client_mode_create(struct ctrlmsg_handler * handle, struct sockaddr MR_INFO("Ctrlplane message module: Client mode"); MR_INFO("Ctrlplane message server: %s:%d", str_server_addr, ntohs(sockaddr_in.sin_port)); MR_INFO("Ctrlplane message client: %s:%d", str_client_addr, ntohs(local_sockaddr_in->sin_port)); - - __handle_new_conn(handle, handle->epoll_fd, conn_fd, sockaddr_in); - return RT_SUCCESS; + + __handle_new_conn(handle, handle->epoll_fd, conn_fd, sockaddr_in); + return RT_SUCCESS; } -struct ctrlmsg_handler * ctrlmsg_handler_create(enum ctrlmsg_handler_mode mode, - struct sockaddr_in sockaddr, void * arg) +struct ctrlmsg_handler * ctrlmsg_handler_create(enum ctrlmsg_handler_mode mode, struct sockaddr_in sockaddr, + void * arg, int timeout) { - struct ctrlmsg_handler * handler; - handler = LIBC_MALLOC(sizeof(struct ctrlmsg_handler)); - MR_VERIFY_MALLOC(handler); - - int ret = 0; - handler->mode = mode; - handler->epoll_fd = 0; - handler->event_fd = 0; - handler->listen_fd = 0; - TAILQ_INIT(&handler->cb_conn_close_list); - TAILQ_INIT(&handler->cb_conn_new_list); - TAILQ_INIT(&handler->cb_list); - TAILQ_INIT(&handler->conn_list); - - if (handler->mode == CTRLMSG_HANDLER_MODE_SERVER) - { - ret = __server_mode_create(handler, sockaddr); - } - else if (handler->mode == CTRLMSG_HANDLER_MODE_CLIENT) - { - ret = __client_mode_create(handler, sockaddr); - } - - if (ret < 0) goto errout; - return handler; + struct ctrlmsg_handler * handler; + handler = LIBC_MALLOC(sizeof(struct ctrlmsg_handler)); + MR_VERIFY_MALLOC(handler); + + int ret = 0; + handler->mode = mode; + handler->timeout = timeout; + handler->epoll_fd = 0; + handler->event_fd = 0; + handler->listen_fd = 0; + + TAILQ_INIT(&handler->cb_conn_close_list); + TAILQ_INIT(&handler->cb_conn_new_list); + TAILQ_INIT(&handler->cb_list); + TAILQ_INIT(&handler->conn_list); + + if (handler->mode == CTRLMSG_HANDLER_MODE_SERVER) + { + ret = __server_mode_create(handler, sockaddr); + } + else if (handler->mode == CTRLMSG_HANDLER_MODE_CLIENT) + { + ret = __client_mode_create(handler, sockaddr); + } + + if (ret < 0) goto errout; + return handler; errout: - if (handler != NULL) free(handler); - return NULL; + if (handler != NULL) free(handler); + return NULL; } int ctrlmsg_thread_launch(struct ctrlmsg_handler * handler) @@ -544,21 +566,21 @@ int ctrlmsg_thread_launch(struct ctrlmsg_handler * handler) } void ctrlmsg_msg_reciver_register(struct ctrlmsg_handler * handler, const char * msg_topic, - unsigned int msg_type, ctrlmsg_cb_t cb, void * arg) + unsigned int msg_type, ctrlmsg_cb_t cb, void * arg) { - struct ctrlmsg_cb * cb_object = LIBC_MALLOC(sizeof(struct ctrlmsg_cb)); - MR_VERIFY_MALLOC(cb_object); - - cb_object->fn_ctrlmsg_cb = cb; - cb_object->arg = arg; - cb_object->type = msg_type; - strncpy(cb_object->topic, msg_topic, sizeof(cb_object->topic)); - TAILQ_INSERT_TAIL(&handler->cb_list, cb_object, next); - return; + struct ctrlmsg_cb * cb_object = LIBC_MALLOC(sizeof(struct ctrlmsg_cb)); + MR_VERIFY_MALLOC(cb_object); + + cb_object->fn_ctrlmsg_cb = cb; + cb_object->arg = arg; + cb_object->type = msg_type; + strncpy(cb_object->topic, msg_topic, sizeof(cb_object->topic)); + TAILQ_INSERT_TAIL(&handler->cb_list, cb_object, next); + return; } int ctrlmsg_msg_send(struct ctrlmsg_handler * handler, struct ctrlmsg_conn * conn, - struct ctrl_msg_header * header) + struct ctrl_msg_header * header) { if (conn == NULL && handler->mode == CTRLMSG_HANDLER_MODE_CLIENT) conn = TAILQ_FIRST(&handler->conn_list); @@ -570,31 +592,31 @@ int ctrlmsg_msg_send(struct ctrlmsg_handler * handler, struct ctrlmsg_conn * con conn->snd_buffer_used += msg_len; __epoll_mod_event(handler->epoll_fd, conn->conn_fd, EPOLLOUT | EPOLLIN); - return 0; + return 0; } -void ctrlmsg_event_conn_new_register(struct ctrlmsg_handler * handler, - ctrlmsg_cb_t cb, void * arg) +void ctrlmsg_event_conn_new_register(struct ctrlmsg_handler * handler, + ctrlmsg_cb_t cb, void * arg) { - struct ctrlmsg_cb * cb_object = LIBC_MALLOC(sizeof(struct ctrlmsg_cb)); - MR_VERIFY_MALLOC(cb_object); - - memset(cb_object, 0, sizeof(struct ctrlmsg_cb)); - cb_object->fn_ctrlmsg_cb = cb; - cb_object->arg = arg; - TAILQ_INSERT_TAIL(&handler->cb_conn_new_list, cb_object, next); - return; + struct ctrlmsg_cb * cb_object = LIBC_MALLOC(sizeof(struct ctrlmsg_cb)); + MR_VERIFY_MALLOC(cb_object); + + memset(cb_object, 0, sizeof(struct ctrlmsg_cb)); + cb_object->fn_ctrlmsg_cb = cb; + cb_object->arg = arg; + TAILQ_INSERT_TAIL(&handler->cb_conn_new_list, cb_object, next); + return; } void ctrlmsg_event_conn_close_register(struct ctrlmsg_handler * handler, - ctrlmsg_cb_t cb, void * arg) + ctrlmsg_cb_t cb, void * arg) { - struct ctrlmsg_cb * cb_object = LIBC_MALLOC(sizeof(struct ctrlmsg_cb)); - MR_VERIFY_MALLOC(cb_object); - - memset(cb_object, 0, sizeof(struct ctrlmsg_cb)); - cb_object->fn_ctrlmsg_cb = cb; - cb_object->arg = arg; - TAILQ_INSERT_TAIL(&handler->cb_conn_close_list, cb_object, next); - return; + struct ctrlmsg_cb * cb_object = LIBC_MALLOC(sizeof(struct ctrlmsg_cb)); + MR_VERIFY_MALLOC(cb_object); + + memset(cb_object, 0, sizeof(struct ctrlmsg_cb)); + cb_object->fn_ctrlmsg_cb = cb; + cb_object->arg = arg; + TAILQ_INSERT_TAIL(&handler->cb_conn_close_list, cb_object, next); + return; }
\ No newline at end of file diff --git a/service/include/sc_common.h b/service/include/sc_common.h index e47d81b..4208e77 100644 --- a/service/include/sc_common.h +++ b/service/include/sc_common.h @@ -67,6 +67,11 @@ struct sc_main /* 异常状态检测标志位,死锁检测 */ unsigned int en_spinlock_check; + /* 异常状态检测标志位,控制死锁检测 */ + unsigned int en_ctrl_spinlock_check; + /* 控制死锁检测检测间隔 */ + unsigned int ctrl_spinlock_check_interval; + /* 异常状态检测标志位,内存泄露检测 */ unsigned int en_memleak_check; /* 软件报文时间戳 */ diff --git a/service/src/core.c b/service/src/core.c index e9816f1..065bb18 100644 --- a/service/src/core.c +++ b/service/src/core.c @@ -71,27 +71,35 @@ const char service_git_version[] = ""; #endif #ifndef MR_SERVICE_DEFAULT_CHECK_SPINLOCK -#define MR_SERVICE_DEFAULT_CHECK_SPINLOCK 1 +#define MR_SERVICE_DEFAULT_CHECK_SPINLOCK 1 +#endif + +#ifndef MR_SERVICE_DEFAULT_CHECK_CTRL_SPINLOCK +#define MR_SERVICE_DEFAULT_CHECK_CTRL_SPINLOCK 1 +#endif + +#ifndef MR_SERVICE_DEFAULT_CHECK_CTRL_SPINLOCK_INTERVAL +#define MR_SERVICE_DEFAULT_CHECK_CTRL_SPINLOCK_INTERVAL 1000 #endif #ifndef MR_SERVICE_DEFAULT_CHECK_MEMLEAK -#define MR_SERVICE_DEFAULT_CHECK_MEMLEAK 1 +#define MR_SERVICE_DEFAULT_CHECK_MEMLEAK 1 #endif #ifndef MR_SERVICE_DEFAULT_PKT_TIMESTAMP -#define MR_SERVICE_DEFAULT_PKT_TIMESTAMP 0 +#define MR_SERVICE_DEFAULT_PKT_TIMESTAMP 0 #endif #ifndef MR_SERVICE_DEFAULT_PKT_DUMPER -#define MR_SERVICE_DEFAULT_PKT_DUMPER 1 +#define MR_SERVICE_DEFAULT_PKT_DUMPER 1 #endif #ifndef MR_SERVICE_DEFAULT_PKT_LATENCY -#define MR_SERVICE_DEFAULT_PKT_LATENCY 0 +#define MR_SERVICE_DEFAULT_PKT_LATENCY 0 #endif #ifndef MR_SERVICE_DEFAULT_PKT_LATENCY_LCORE_ID -#define MR_SERVICE_DEFAULT_PKT_LATENCY_LCORE_ID 0 +#define MR_SERVICE_DEFAULT_PKT_LATENCY_LCORE_ID 0 #endif unsigned int g_logger_to_stdout = 1; @@ -398,9 +406,16 @@ static int sc_ctrlmsg_init(struct sc_main * sc) sockaddr_in.sin_port = htons(ctrlmsg_port); sockaddr_in.sin_family = AF_INET; + int __timeout = -1; + if (sc->en_ctrl_spinlock_check > 0) __timeout = sc->ctrl_spinlock_check_interval; + /* 创建消息处理框架句柄 */ - sc->ctrlmsg_handler = ctrlmsg_handler_create(CTRLMSG_HANDLER_MODE_SERVER, sockaddr_in, NULL); - if (sc->ctrlmsg_handler == NULL) return RT_ERR; + sc->ctrlmsg_handler = ctrlmsg_handler_create(CTRLMSG_HANDLER_MODE_SERVER, + sockaddr_in, NULL, __timeout); + + if (sc->ctrlmsg_handler == NULL) + return RT_ERR; + return RT_SUCCESS; } @@ -466,6 +481,14 @@ static int sc_g_config_init(struct sc_main * sc) MESA_load_profile_uint_def(sc->local_cfgfile, "keepalive", "check_spinlock", &sc->en_spinlock_check, MR_SERVICE_DEFAULT_CHECK_SPINLOCK); + /* 控制面线程保活 */ + MESA_load_profile_uint_def(sc->local_cfgfile, "keepalive", "check_ctrl_spinlock", + &sc->en_ctrl_spinlock_check, MR_SERVICE_DEFAULT_CHECK_CTRL_SPINLOCK); + + /* 控制面线程保活间隔 */ + MESA_load_profile_uint_def(sc->local_cfgfile, "keepalive", "check_ctrl_spinlock_interval", + &sc->ctrl_spinlock_check_interval, MR_SERVICE_DEFAULT_CHECK_CTRL_SPINLOCK_INTERVAL); + /* 内存泄露检测 */ MESA_load_profile_uint_def(sc->local_cfgfile, "keepalive", "check_memleak", &sc->en_memleak_check, MR_SERVICE_DEFAULT_CHECK_MEMLEAK); diff --git a/tools/systemd/mrzcpd.service.in b/tools/systemd/mrzcpd.service.in index 71d2bc4..1952b3c 100644 --- a/tools/systemd/mrzcpd.service.in +++ b/tools/systemd/mrzcpd.service.in @@ -15,6 +15,7 @@ ExecStopPost=/usr/bin/rm -rf /run/.dpdk Restart=always RestartSec=5s +WatchdogSec=5s Type=notify [Install] |
