summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQiuwen Lu <[email protected]>2017-10-20 14:30:24 +0800
committerQiuwen Lu <[email protected]>2017-10-20 14:30:24 +0800
commit9370fdec4c2f84070d85cf6c74a1fe58eb5978d1 (patch)
treeee4dfabaedf08f61370eed9d7de9a8d0efb00749
parent8d3bd791d0992f87ae0cf7e114a1c356e3e7d45b (diff)
增加控制线程(控制指令通信)保活功能
- 增加控制线程(控制指令通信)保活功能,定期喂狗。当控制线程死锁时,systemd看门狗将进程杀死。
-rw-r--r--infra/include/ctrlmsg.h6
-rw-r--r--infra/src/ctrlmsg.c656
-rw-r--r--service/include/sc_common.h5
-rw-r--r--service/src/core.c39
-rw-r--r--tools/systemd/mrzcpd.service.in1
5 files changed, 380 insertions, 327 deletions
diff --git a/infra/include/ctrlmsg.h b/infra/include/ctrlmsg.h
index 68c2ecf..c33c7dc 100644
--- a/infra/include/ctrlmsg.h
+++ b/infra/include/ctrlmsg.h
@@ -99,6 +99,8 @@ struct ctrlmsg_handler
int event_fd;
/* 监听FD */
int listen_fd;
+ /* 保活间隔 */
+ int timeout;
/* 有效连接链表 */
struct ctrlmsg_conn_list conn_list;
/* 回调函数列表 */
@@ -109,8 +111,8 @@ struct ctrlmsg_handler
struct ctrlmsg_cb_list cb_conn_close_list;
};
-struct ctrlmsg_handler * ctrlmsg_handler_create(enum ctrlmsg_handler_mode mode,
- struct sockaddr_in sockaddr, void * arg);
+struct ctrlmsg_handler * ctrlmsg_handler_create(enum ctrlmsg_handler_mode mode, struct sockaddr_in sockaddr,
+ void * arg, int timeout);
int ctrlmsg_thread_launch(struct ctrlmsg_handler * handler);
diff --git a/infra/src/ctrlmsg.c b/infra/src/ctrlmsg.c
index 060ac1e..aedd4c1 100644
--- a/infra/src/ctrlmsg.c
+++ b/infra/src/ctrlmsg.c
@@ -19,13 +19,15 @@
#include <arpa/inet.h>
#include <errno.h>
+#include <systemd/sd-daemon.h>
+
#define __EV_MAX_EVENTS 16
#define __EV_MAX_BUFFER 2048
#define __CONN_MAX_BUFFER 8192
static void __cleanup_unlock_mutex(pthread_mutex_t ** lock)
{
- pthread_mutex_unlock(*lock);
+ pthread_mutex_unlock(*lock);
}
#define __MUTEX_LOCK(lock) \
@@ -35,52 +37,52 @@ static void __cleanup_unlock_mutex(pthread_mutex_t ** lock)
static int __tcp_setnonblock(int sockfd)
{
- int flags = fcntl(sockfd, F_GETFL, 0);
- if (flags == -1) return -1;
- int ret = fcntl(sockfd, F_SETFL, flags | O_NONBLOCK, 0);
- if (ret == -1) return -1;
- return 0;
+ int flags = fcntl(sockfd, F_GETFL, 0);
+ if (flags == -1) return -1;
+ int ret = fcntl(sockfd, F_SETFL, flags | O_NONBLOCK, 0);
+ if (ret == -1) return -1;
+ return 0;
}
static int __update_epoll_event(int epoll_fd, int epoll_mode, int fd, int epoll_type)
{
- struct epoll_event event = { 0 };
- event.data.fd = fd;
- event.events = epoll_type;
+ struct epoll_event event = { 0 };
+ event.data.fd = fd;
+ event.events = epoll_type;
- __tcp_setnonblock(fd);
- if (-1 == epoll_ctl(epoll_fd, epoll_mode, fd, &event))
- {
- MR_ERROR("update epoll event failed : %s\n", strerror(errno)); return -1;
- }
+ __tcp_setnonblock(fd);
+ if (-1 == epoll_ctl(epoll_fd, epoll_mode, fd, &event))
+ {
+ MR_ERROR("update epoll event failed : %s\n", strerror(errno)); return -1;
+ }
- return 0;
+ return 0;
}
int __epoll_add_event(int epoll_fd, int sd, int epoll_type)
{
- return __update_epoll_event(epoll_fd, EPOLL_CTL_ADD, sd, epoll_type);
+ return __update_epoll_event(epoll_fd, EPOLL_CTL_ADD, sd, epoll_type);
}
int __epoll_del_event(int epoll_fd, int sd, int epoll_type)
{
- return __update_epoll_event(epoll_fd, EPOLL_CTL_DEL, sd, epoll_type);
+ return __update_epoll_event(epoll_fd, EPOLL_CTL_DEL, sd, epoll_type);
}
int __epoll_mod_event(int epoll_fd, int sd, int epoll_type)
{
- return __update_epoll_event(epoll_fd, EPOLL_CTL_MOD, sd, epoll_type);
+ return __update_epoll_event(epoll_fd, EPOLL_CTL_MOD, sd, epoll_type);
}
struct ctrlmsg_conn * __lookup_conn_by_fd(struct ctrlmsg_handler * handler, int fd)
{
- struct ctrlmsg_conn * conn_iter = NULL;
- TAILQ_FOREACH(conn_iter, &handler->conn_list, next)
- {
- if (conn_iter->conn_fd == fd) return conn_iter;
- }
+ struct ctrlmsg_conn * conn_iter = NULL;
+ TAILQ_FOREACH(conn_iter, &handler->conn_list, next)
+ {
+ if (conn_iter->conn_fd == fd) return conn_iter;
+ }
- return NULL;
+ return NULL;
}
enum parse_status
@@ -101,7 +103,7 @@ static int parse_line(struct ctrlmsg_handler * handler, struct ctrlmsg_conn * co
unsigned int * m_checked_idx = &conn->rcv_buffer_checked;
unsigned int * m_read_idx = &conn->rcv_buffer_used;
char * m_read_buf = conn->rcv_buffer;
-
+
for (; *m_checked_idx < *m_read_idx; (*m_checked_idx)++)
{
char temp = conn->rcv_buffer[conn->rcv_buffer_checked];
@@ -125,7 +127,7 @@ static int parse_line(struct ctrlmsg_handler * handler, struct ctrlmsg_conn * co
{
if ((*m_checked_idx > 1) && (m_read_buf[*m_checked_idx - 1] == '\r'))
{
- m_read_buf[(*m_checked_idx)-1] = '\0';
+ m_read_buf[(*m_checked_idx) - 1] = '\0';
m_read_buf[(*m_checked_idx)++] = '\0';
return PARSE_OK;
}
@@ -142,10 +144,10 @@ static int parse_content_json(struct ctrlmsg_handler * handler, struct ctrlmsg_c
return 0;
}
-static int parse_content_binary(struct ctrlmsg_handler * handler, struct ctrlmsg_conn * conn,
- int epoll_fd, int fd)
+static int parse_content_binary(struct ctrlmsg_handler * handler, struct ctrlmsg_conn * conn,
+ int epoll_fd, int fd)
{
- /* 缓冲区的数据还不够头部,退出循环 */
+ /* 缓冲区的数据还不够头部,退出循环 */
while (conn->rcv_buffer_used >= sizeof(struct ctrl_msg_header))
{
/* 缓冲区已经收到了头部,但数据长度不足报文声明的长度,数据不全 */
@@ -179,110 +181,110 @@ static int parse_content_binary(struct ctrlmsg_handler * handler, struct ctrlmsg
}
/* 处理新建的监听连接 */
-void __handle_new_conn(struct ctrlmsg_handler * handler,
- int epoll_fd, int conn_fd, struct sockaddr_in remote_addr)
+void __handle_new_conn(struct ctrlmsg_handler * handler,
+ int epoll_fd, int conn_fd, struct sockaddr_in remote_addr)
{
- /* 分配新的APP-CONN结构体,只保存链接信息,不创建应用信息 */
- struct ctrlmsg_conn * conn = LIBC_MALLOC(sizeof(struct ctrlmsg_conn));
- MR_VERIFY_MALLOC(conn);
-
- conn->conn_fd = conn_fd;
- conn->peer_sockaddr_in = remote_addr;
- conn->rcv_buffer = LIBC_MALLOC(__CONN_MAX_BUFFER);
- conn->rcv_buffer_max = __CONN_MAX_BUFFER;
- conn->rcv_buffer_used = 0;
- conn->snd_buffer = LIBC_MALLOC(__CONN_MAX_BUFFER);
- conn->snd_buffer_max = __CONN_MAX_BUFFER;
- conn->snd_buffer_used = 0;
+ /* 分配新的APP-CONN结构体,只保存链接信息,不创建应用信息 */
+ struct ctrlmsg_conn * conn = LIBC_MALLOC(sizeof(struct ctrlmsg_conn));
+ MR_VERIFY_MALLOC(conn);
+
+ conn->conn_fd = conn_fd;
+ conn->peer_sockaddr_in = remote_addr;
+ conn->rcv_buffer = LIBC_MALLOC(__CONN_MAX_BUFFER);
+ conn->rcv_buffer_max = __CONN_MAX_BUFFER;
+ conn->rcv_buffer_used = 0;
+ conn->snd_buffer = LIBC_MALLOC(__CONN_MAX_BUFFER);
+ conn->snd_buffer_max = __CONN_MAX_BUFFER;
+ conn->snd_buffer_used = 0;
conn->snd_buffer_deal = 0;
- MR_VERIFY_MALLOC(conn->rcv_buffer);
- MR_VERIFY_MALLOC(conn->snd_buffer);
+ MR_VERIFY_MALLOC(conn->rcv_buffer);
+ MR_VERIFY_MALLOC(conn->snd_buffer);
/* 初始化锁,使用递归锁 */
pthread_mutexattr_t mutex_attr;
pthread_mutexattr_init(&mutex_attr);
pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&conn->mutex, &mutex_attr);
-
- /* 加入链接表中 */
- TAILQ_INSERT_TAIL(&handler->conn_list, conn, next);
-
- /* 调用新链接回调函数 */
- struct ctrlmsg_cb * cb_object;
- TAILQ_FOREACH(cb_object, &handler->cb_conn_new_list, next)
- {
- cb_object->fn_ctrlmsg_cb(handler, conn, NULL, cb_object->arg);
- }
-
- __epoll_add_event(epoll_fd, conn_fd, EPOLLIN);
- return;
+
+ /* 加入链接表中 */
+ TAILQ_INSERT_TAIL(&handler->conn_list, conn, next);
+
+ /* 调用新链接回调函数 */
+ struct ctrlmsg_cb * cb_object;
+ TAILQ_FOREACH(cb_object, &handler->cb_conn_new_list, next)
+ {
+ cb_object->fn_ctrlmsg_cb(handler, conn, NULL, cb_object->arg);
+ }
+
+ __epoll_add_event(epoll_fd, conn_fd, EPOLLIN);
+ return;
}
// 处理链接关闭信息
void __handle_close(struct ctrlmsg_handler * handler, struct ctrlmsg_conn * conn,
- int epoll_fd, int fd)
+ int epoll_fd, int fd)
{
- /* 调用链接终止回调函数 */
- struct ctrlmsg_cb * cb_object;
- TAILQ_FOREACH(cb_object, &handler->cb_conn_close_list, next)
- {
- cb_object->fn_ctrlmsg_cb(handler, conn, NULL, cb_object->arg);
- }
-
- __epoll_del_event(epoll_fd, fd, EPOLLIN);
- close(fd);
-
- TAILQ_REMOVE(&handler->conn_list, conn, next);
- free(conn->rcv_buffer);
- free(conn->snd_buffer);
- free(conn);
- return;
+ /* 调用链接终止回调函数 */
+ struct ctrlmsg_cb * cb_object;
+ TAILQ_FOREACH(cb_object, &handler->cb_conn_close_list, next)
+ {
+ cb_object->fn_ctrlmsg_cb(handler, conn, NULL, cb_object->arg);
+ }
+
+ __epoll_del_event(epoll_fd, fd, EPOLLIN);
+ close(fd);
+
+ TAILQ_REMOVE(&handler->conn_list, conn, next);
+ free(conn->rcv_buffer);
+ free(conn->snd_buffer);
+ free(conn);
+ return;
}
/* 处理监听连接到达的数据 */
void __handle_read(struct ctrlmsg_handler * handler, struct ctrlmsg_conn * conn,
- int epoll_fd, int fd)
+ int epoll_fd, int fd)
{
__MUTEX_LOCK(&conn->mutex);
- for (;;)
- {
- int ret = recv(fd, RTE_PTR_ADD(conn->rcv_buffer, conn->rcv_buffer_used),
- conn->rcv_buffer_max - conn->rcv_buffer_used, 0);
-
- if (ret < 0 && (errno == EWOULDBLOCK || errno == EAGAIN))
- {
- break;
- }
-
- if (ret < 0)
- {
- MR_ERROR("File descripter recv error: %s", strerror(errno));
- __handle_close(handler, conn, epoll_fd, fd);
- return;
- }
-
- if (ret == 0)
- {
- __handle_close(handler, conn, epoll_fd, fd);
- return;
- }
-
- conn->rcv_buffer_used += ret;
- }
-
- parse_content_binary(handler, conn, epoll_fd, fd);
- return;
+ for (;;)
+ {
+ int ret = recv(fd, RTE_PTR_ADD(conn->rcv_buffer, conn->rcv_buffer_used),
+ conn->rcv_buffer_max - conn->rcv_buffer_used, 0);
+
+ if (ret < 0 && (errno == EWOULDBLOCK || errno == EAGAIN))
+ {
+ break;
+ }
+
+ if (ret < 0)
+ {
+ MR_ERROR("File descripter recv error: %s", strerror(errno));
+ __handle_close(handler, conn, epoll_fd, fd);
+ return;
+ }
+
+ if (ret == 0)
+ {
+ __handle_close(handler, conn, epoll_fd, fd);
+ return;
+ }
+
+ conn->rcv_buffer_used += ret;
+ }
+
+ parse_content_binary(handler, conn, epoll_fd, fd);
+ return;
}
void __handle_write(struct ctrlmsg_handler * handler, struct ctrlmsg_conn * conn,
- int epoll_fd, int fd)
+ int epoll_fd, int fd)
{
- int ret = 0;
+ int ret = 0;
__MUTEX_LOCK(&conn->mutex);
- do {
- ret = send(fd, conn->snd_buffer, conn->snd_buffer_used, MSG_NOSIGNAL);
+ do {
+ ret = send(fd, conn->snd_buffer, conn->snd_buffer_used, MSG_NOSIGNAL);
} while ((ret == -1 && (errno == EINTR)));
conn->snd_buffer_deal += ret;
@@ -294,182 +296,200 @@ void __handle_write(struct ctrlmsg_handler * handler, struct ctrlmsg_conn * conn
conn->snd_buffer_deal = 0;
__epoll_mod_event(handler->epoll_fd, conn->conn_fd, EPOLLIN);
}
-
- return;
+
+ return;
}
+void __handle_timeout(struct ctrlmsg_handler * handler)
+{
+ /* 向systemd的watchdog喂狗 */
+ /* TODO: 不应该在这里做这件事,应该在外面,通过回调函数调用 */
+
+ if (sd_watchdog_enabled(0, NULL))
+ {
+ sd_notify(0, "WATCHDOG=1");
+ }
+
+ return;
+}
static void * __epoll_event_loop(void * arg)
{
- struct ctrlmsg_handler * handler = (struct ctrlmsg_handler *)arg;
- struct epoll_event evlist[__EV_MAX_EVENTS];
-
- int epoll_fd = handler->epoll_fd;
- int listen_fd = handler->listen_fd;
-
- /* 对于客户端模式,没有监听的端口 */
- if (handler->mode == CTRLMSG_HANDLER_MODE_CLIENT) listen_fd = -1;
-
- while (1)
- {
- int ret = epoll_wait(epoll_fd, evlist, __EV_MAX_EVENTS, -1);
- if (ret == -1 && errno == EINTR) continue;
- else if (ret == -1)
- {
- MR_ERROR("Ctrlmsg event loop thread failed :waiting on epoll fd : %s",
- strerror(errno)); goto errout;
- }
-
- for (int i = 0; i < ret; i++)
- {
- int fd = evlist[i].data.fd;
-
- /* 新连接,仅对服务器模式有效 */
- if (fd == listen_fd && (evlist[i].events & EPOLLIN))
- {
- struct sockaddr_in remote_addr;
- socklen_t sz_remote_addr = sizeof(remote_addr);
+ struct ctrlmsg_handler * handler = (struct ctrlmsg_handler *)arg;
+ struct epoll_event evlist[__EV_MAX_EVENTS];
+
+ int epoll_fd = handler->epoll_fd;
+ int listen_fd = handler->listen_fd;
+ int timeout = handler->timeout;
+
+ /* 对于客户端模式,没有监听的端口 */
+ if (handler->mode == CTRLMSG_HANDLER_MODE_CLIENT)
+ listen_fd = -1;
+
+ while (1)
+ {
+ int ret = epoll_wait(epoll_fd, evlist, __EV_MAX_EVENTS, timeout);
+ if (ret == -1 && errno == EINTR) continue;
+ else if (ret == -1)
+ {
+ MR_ERROR("Ctrlmsg event loop thread failed :waiting on epoll fd : %s",
+ strerror(errno)); goto errout;
+ }
+
+ for (int i = 0; i < ret; i++)
+ {
+ int fd = evlist[i].data.fd;
+
+ /* 新连接,仅对服务器模式有效 */
+ if (fd == listen_fd && (evlist[i].events & EPOLLIN))
+ {
+ struct sockaddr_in remote_addr;
+ socklen_t sz_remote_addr = sizeof(remote_addr);
/* 使用accept4接口,在接收新链接时直接设置非阻塞选项、exec关闭选项 */
int conn_fd = accept4(listen_fd, (struct sockaddr *)&remote_addr,
&sz_remote_addr, SOCK_CLOEXEC | SOCK_NONBLOCK);
- if (conn_fd < 0)
- {
- MR_ERROR("Accept remote connnection failed : %s", strerror(errno));
- continue;
- }
-
- __handle_new_conn(handler, epoll_fd, conn_fd, remote_addr);
- continue;
- }
-
- /* 已经存在的连接,先查找对应的conn */
- struct ctrlmsg_conn * conn = __lookup_conn_by_fd(handler, fd);
- if (conn == NULL)
- {
- MR_ERROR("Cannot locate conn structure for fd %d", fd);
- continue;
- }
-
- /* 异常链接 */
- if (evlist[i].events & (EPOLLHUP | EPOLLERR))
- {
- __handle_close(handler, conn, epoll_fd, fd);
- continue;
- }
-
- /* 数据到达 */
- if (evlist[i].events & EPOLLIN)
- {
- __handle_read(handler, conn, epoll_fd, fd);
- continue;
- }
-
- /* 描述符可写 */
- if (evlist[i].events & EPOLLOUT)
- {
- __handle_write(handler, conn, epoll_fd, fd);
- continue;
- }
- }
- }
+ if (conn_fd < 0)
+ {
+ MR_ERROR("Accept remote connnection failed : %s", strerror(errno));
+ continue;
+ }
+
+ __handle_new_conn(handler, epoll_fd, conn_fd, remote_addr);
+ continue;
+ }
+
+ /* 已经存在的连接,先查找对应的conn */
+ struct ctrlmsg_conn * conn = __lookup_conn_by_fd(handler, fd);
+ if (conn == NULL)
+ {
+ MR_ERROR("Cannot locate conn structure for fd %d", fd);
+ continue;
+ }
+
+ /* 异常链接 */
+ if (evlist[i].events & (EPOLLHUP | EPOLLERR))
+ {
+ __handle_close(handler, conn, epoll_fd, fd);
+ continue;
+ }
+
+ /* 数据到达 */
+ if (evlist[i].events & EPOLLIN)
+ {
+ __handle_read(handler, conn, epoll_fd, fd);
+ continue;
+ }
+
+ /* 描述符可写 */
+ if (evlist[i].events & EPOLLOUT)
+ {
+ __handle_write(handler, conn, epoll_fd, fd);
+ continue;
+ }
+ }
+
+ /* 超时事件,喂狗 */
+ if (ret == 0 && timeout >= 0)
+ __handle_timeout(handler);
+ }
errout:
- MR_ERROR("Ctrlmsg event loop thread terminated. ");
- return NULL;
+ MR_ERROR("Ctrlmsg event loop thread terminated. ");
+ return NULL;
}
static int __common_mode_create(struct ctrlmsg_handler * handle)
{
int epoll_fd = epoll_create1(EPOLL_CLOEXEC);
- if (epoll_fd < 0)
- {
- MR_ERROR("Cannot create epoll fd: %s. ", strerror(errno));
- return RT_ERR;
- }
-
- int event_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
- if(eventfd < 0)
- {
- MR_ERROR("Cannot create event fd: %s. ", strerror(errno));
- return RT_ERR;
- }
-
- handle->epoll_fd = epoll_fd;
- handle->event_fd = event_fd;
- return RT_SUCCESS;
+ if (epoll_fd < 0)
+ {
+ MR_ERROR("Cannot create epoll fd: %s. ", strerror(errno));
+ return RT_ERR;
+ }
+
+ int event_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
+ if (eventfd < 0)
+ {
+ MR_ERROR("Cannot create event fd: %s. ", strerror(errno));
+ return RT_ERR;
+ }
+
+ handle->epoll_fd = epoll_fd;
+ handle->event_fd = event_fd;
+ return RT_SUCCESS;
}
static int __server_mode_create(struct ctrlmsg_handler * handle, struct sockaddr_in sockaddr_in)
{
- int ret = __common_mode_create(handle);
- if (ret < 0) return ret;
-
- // 监听本地端口,TCP连接
- int listen_fd = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, 0);
- if (listen_fd < 0)
- {
- MR_ERROR("Create event listen fd failed : %s.", strerror(errno));
- goto out;
- }
-
- int reuse = 1;
- if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, (const void*)&reuse, sizeof(int)) < 0)
- {
- MR_ERROR("Set crash event listen fd reuse failed : %s", strerror(errno));
- goto out;
- }
-
- ret = bind(listen_fd, (struct sockaddr *)&sockaddr_in, sizeof(struct sockaddr_in));
- if (ret < 0)
- {
- MR_ERROR("Bind ctrlmsg listen fd failed : %s.", strerror(errno));
- goto out;
- }
-
- ret = listen(listen_fd, 20);
- if (ret < 0)
- {
- MR_ERROR("Listen ctrlmsg listen fd failed : %s.", strerror(errno));
- goto out;
- }
-
- __epoll_add_event(handle->epoll_fd, listen_fd, EPOLLIN);
-
- char str_addr[MR_STRING_MAX];
- inet_ntop(AF_INET, &(sockaddr_in.sin_addr), str_addr, sizeof(str_addr));
-
+ int ret = __common_mode_create(handle);
+ if (ret < 0) return ret;
+
+ // 监听本地端口,TCP连接
+ int listen_fd = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, 0);
+ if (listen_fd < 0)
+ {
+ MR_ERROR("Create event listen fd failed : %s.", strerror(errno));
+ goto out;
+ }
+
+ int reuse = 1;
+ if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, (const void*)&reuse, sizeof(int)) < 0)
+ {
+ MR_ERROR("Set crash event listen fd reuse failed : %s", strerror(errno));
+ goto out;
+ }
+
+ ret = bind(listen_fd, (struct sockaddr *)&sockaddr_in, sizeof(struct sockaddr_in));
+ if (ret < 0)
+ {
+ MR_ERROR("Bind ctrlmsg listen fd failed : %s.", strerror(errno));
+ goto out;
+ }
+
+ ret = listen(listen_fd, 20);
+ if (ret < 0)
+ {
+ MR_ERROR("Listen ctrlmsg listen fd failed : %s.", strerror(errno));
+ goto out;
+ }
+
+ __epoll_add_event(handle->epoll_fd, listen_fd, EPOLLIN);
+
+ char str_addr[MR_STRING_MAX];
+ inet_ntop(AF_INET, &(sockaddr_in.sin_addr), str_addr, sizeof(str_addr));
+
MR_INFO("Ctrlplane message module: Server mode");
MR_INFO("Ctrlplane message server: %s:%d", str_addr, ntohs(sockaddr_in.sin_port));
-
- // 处理公共描述符
- handle->listen_fd = listen_fd;
- return RT_SUCCESS;
+
+ // 处理公共描述符
+ handle->listen_fd = listen_fd;
+ return RT_SUCCESS;
out:
- if (listen_fd > 0) close(listen_fd);
- return RT_ERR;
+ if (listen_fd > 0) close(listen_fd);
+ return RT_ERR;
}
static int __client_mode_create(struct ctrlmsg_handler * handle, struct sockaddr_in sockaddr_in)
{
- int ret = __common_mode_create(handle);
- if (ret < 0) return ret;
-
- int conn_fd = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, 0);
- if (conn_fd < 0)
- {
- MR_ERROR("Cannot create socket for ctrlplane message : %s.", strerror(errno));
- return RT_ERR;
- }
-
- ret = connect(conn_fd, (const struct sockaddr *)&sockaddr_in, sizeof(sockaddr_in));
- if (ret < 0)
- {
- MR_ERROR("Connect to ctrlplane message service failed : %s.", strerror(errno));
- return RT_ERR;
- }
+ int ret = __common_mode_create(handle);
+ if (ret < 0) return ret;
+
+ int conn_fd = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, 0);
+ if (conn_fd < 0)
+ {
+ MR_ERROR("Cannot create socket for ctrlplane message : %s.", strerror(errno));
+ return RT_ERR;
+ }
+
+ ret = connect(conn_fd, (const struct sockaddr *)&sockaddr_in, sizeof(sockaddr_in));
+ if (ret < 0)
+ {
+ MR_ERROR("Connect to ctrlplane message service failed : %s.", strerror(errno));
+ return RT_ERR;
+ }
char str_server_addr[INET_ADDRSTRLEN] = { 0 };
inet_ntop(AF_INET, &(sockaddr_in.sin_addr), str_server_addr, INET_ADDRSTRLEN);
@@ -490,43 +510,45 @@ static int __client_mode_create(struct ctrlmsg_handler * handle, struct sockaddr
MR_INFO("Ctrlplane message module: Client mode");
MR_INFO("Ctrlplane message server: %s:%d", str_server_addr, ntohs(sockaddr_in.sin_port));
MR_INFO("Ctrlplane message client: %s:%d", str_client_addr, ntohs(local_sockaddr_in->sin_port));
-
- __handle_new_conn(handle, handle->epoll_fd, conn_fd, sockaddr_in);
- return RT_SUCCESS;
+
+ __handle_new_conn(handle, handle->epoll_fd, conn_fd, sockaddr_in);
+ return RT_SUCCESS;
}
-struct ctrlmsg_handler * ctrlmsg_handler_create(enum ctrlmsg_handler_mode mode,
- struct sockaddr_in sockaddr, void * arg)
+struct ctrlmsg_handler * ctrlmsg_handler_create(enum ctrlmsg_handler_mode mode, struct sockaddr_in sockaddr,
+ void * arg, int timeout)
{
- struct ctrlmsg_handler * handler;
- handler = LIBC_MALLOC(sizeof(struct ctrlmsg_handler));
- MR_VERIFY_MALLOC(handler);
-
- int ret = 0;
- handler->mode = mode;
- handler->epoll_fd = 0;
- handler->event_fd = 0;
- handler->listen_fd = 0;
- TAILQ_INIT(&handler->cb_conn_close_list);
- TAILQ_INIT(&handler->cb_conn_new_list);
- TAILQ_INIT(&handler->cb_list);
- TAILQ_INIT(&handler->conn_list);
-
- if (handler->mode == CTRLMSG_HANDLER_MODE_SERVER)
- {
- ret = __server_mode_create(handler, sockaddr);
- }
- else if (handler->mode == CTRLMSG_HANDLER_MODE_CLIENT)
- {
- ret = __client_mode_create(handler, sockaddr);
- }
-
- if (ret < 0) goto errout;
- return handler;
+ struct ctrlmsg_handler * handler;
+ handler = LIBC_MALLOC(sizeof(struct ctrlmsg_handler));
+ MR_VERIFY_MALLOC(handler);
+
+ int ret = 0;
+ handler->mode = mode;
+ handler->timeout = timeout;
+ handler->epoll_fd = 0;
+ handler->event_fd = 0;
+ handler->listen_fd = 0;
+
+ TAILQ_INIT(&handler->cb_conn_close_list);
+ TAILQ_INIT(&handler->cb_conn_new_list);
+ TAILQ_INIT(&handler->cb_list);
+ TAILQ_INIT(&handler->conn_list);
+
+ if (handler->mode == CTRLMSG_HANDLER_MODE_SERVER)
+ {
+ ret = __server_mode_create(handler, sockaddr);
+ }
+ else if (handler->mode == CTRLMSG_HANDLER_MODE_CLIENT)
+ {
+ ret = __client_mode_create(handler, sockaddr);
+ }
+
+ if (ret < 0) goto errout;
+ return handler;
errout:
- if (handler != NULL) free(handler);
- return NULL;
+ if (handler != NULL) free(handler);
+ return NULL;
}
int ctrlmsg_thread_launch(struct ctrlmsg_handler * handler)
@@ -544,21 +566,21 @@ int ctrlmsg_thread_launch(struct ctrlmsg_handler * handler)
}
void ctrlmsg_msg_reciver_register(struct ctrlmsg_handler * handler, const char * msg_topic,
- unsigned int msg_type, ctrlmsg_cb_t cb, void * arg)
+ unsigned int msg_type, ctrlmsg_cb_t cb, void * arg)
{
- struct ctrlmsg_cb * cb_object = LIBC_MALLOC(sizeof(struct ctrlmsg_cb));
- MR_VERIFY_MALLOC(cb_object);
-
- cb_object->fn_ctrlmsg_cb = cb;
- cb_object->arg = arg;
- cb_object->type = msg_type;
- strncpy(cb_object->topic, msg_topic, sizeof(cb_object->topic));
- TAILQ_INSERT_TAIL(&handler->cb_list, cb_object, next);
- return;
+ struct ctrlmsg_cb * cb_object = LIBC_MALLOC(sizeof(struct ctrlmsg_cb));
+ MR_VERIFY_MALLOC(cb_object);
+
+ cb_object->fn_ctrlmsg_cb = cb;
+ cb_object->arg = arg;
+ cb_object->type = msg_type;
+ strncpy(cb_object->topic, msg_topic, sizeof(cb_object->topic));
+ TAILQ_INSERT_TAIL(&handler->cb_list, cb_object, next);
+ return;
}
int ctrlmsg_msg_send(struct ctrlmsg_handler * handler, struct ctrlmsg_conn * conn,
- struct ctrl_msg_header * header)
+ struct ctrl_msg_header * header)
{
if (conn == NULL && handler->mode == CTRLMSG_HANDLER_MODE_CLIENT)
conn = TAILQ_FIRST(&handler->conn_list);
@@ -570,31 +592,31 @@ int ctrlmsg_msg_send(struct ctrlmsg_handler * handler, struct ctrlmsg_conn * con
conn->snd_buffer_used += msg_len;
__epoll_mod_event(handler->epoll_fd, conn->conn_fd, EPOLLOUT | EPOLLIN);
- return 0;
+ return 0;
}
-void ctrlmsg_event_conn_new_register(struct ctrlmsg_handler * handler,
- ctrlmsg_cb_t cb, void * arg)
+void ctrlmsg_event_conn_new_register(struct ctrlmsg_handler * handler,
+ ctrlmsg_cb_t cb, void * arg)
{
- struct ctrlmsg_cb * cb_object = LIBC_MALLOC(sizeof(struct ctrlmsg_cb));
- MR_VERIFY_MALLOC(cb_object);
-
- memset(cb_object, 0, sizeof(struct ctrlmsg_cb));
- cb_object->fn_ctrlmsg_cb = cb;
- cb_object->arg = arg;
- TAILQ_INSERT_TAIL(&handler->cb_conn_new_list, cb_object, next);
- return;
+ struct ctrlmsg_cb * cb_object = LIBC_MALLOC(sizeof(struct ctrlmsg_cb));
+ MR_VERIFY_MALLOC(cb_object);
+
+ memset(cb_object, 0, sizeof(struct ctrlmsg_cb));
+ cb_object->fn_ctrlmsg_cb = cb;
+ cb_object->arg = arg;
+ TAILQ_INSERT_TAIL(&handler->cb_conn_new_list, cb_object, next);
+ return;
}
void ctrlmsg_event_conn_close_register(struct ctrlmsg_handler * handler,
- ctrlmsg_cb_t cb, void * arg)
+ ctrlmsg_cb_t cb, void * arg)
{
- struct ctrlmsg_cb * cb_object = LIBC_MALLOC(sizeof(struct ctrlmsg_cb));
- MR_VERIFY_MALLOC(cb_object);
-
- memset(cb_object, 0, sizeof(struct ctrlmsg_cb));
- cb_object->fn_ctrlmsg_cb = cb;
- cb_object->arg = arg;
- TAILQ_INSERT_TAIL(&handler->cb_conn_close_list, cb_object, next);
- return;
+ struct ctrlmsg_cb * cb_object = LIBC_MALLOC(sizeof(struct ctrlmsg_cb));
+ MR_VERIFY_MALLOC(cb_object);
+
+ memset(cb_object, 0, sizeof(struct ctrlmsg_cb));
+ cb_object->fn_ctrlmsg_cb = cb;
+ cb_object->arg = arg;
+ TAILQ_INSERT_TAIL(&handler->cb_conn_close_list, cb_object, next);
+ return;
} \ No newline at end of file
diff --git a/service/include/sc_common.h b/service/include/sc_common.h
index e47d81b..4208e77 100644
--- a/service/include/sc_common.h
+++ b/service/include/sc_common.h
@@ -67,6 +67,11 @@ struct sc_main
/* 异常状态检测标志位,死锁检测 */
unsigned int en_spinlock_check;
+ /* 异常状态检测标志位,控制死锁检测 */
+ unsigned int en_ctrl_spinlock_check;
+ /* 控制死锁检测检测间隔 */
+ unsigned int ctrl_spinlock_check_interval;
+
/* 异常状态检测标志位,内存泄露检测 */
unsigned int en_memleak_check;
/* 软件报文时间戳 */
diff --git a/service/src/core.c b/service/src/core.c
index e9816f1..065bb18 100644
--- a/service/src/core.c
+++ b/service/src/core.c
@@ -71,27 +71,35 @@ const char service_git_version[] = "";
#endif
#ifndef MR_SERVICE_DEFAULT_CHECK_SPINLOCK
-#define MR_SERVICE_DEFAULT_CHECK_SPINLOCK 1
+#define MR_SERVICE_DEFAULT_CHECK_SPINLOCK 1
+#endif
+
+#ifndef MR_SERVICE_DEFAULT_CHECK_CTRL_SPINLOCK
+#define MR_SERVICE_DEFAULT_CHECK_CTRL_SPINLOCK 1
+#endif
+
+#ifndef MR_SERVICE_DEFAULT_CHECK_CTRL_SPINLOCK_INTERVAL
+#define MR_SERVICE_DEFAULT_CHECK_CTRL_SPINLOCK_INTERVAL 1000
#endif
#ifndef MR_SERVICE_DEFAULT_CHECK_MEMLEAK
-#define MR_SERVICE_DEFAULT_CHECK_MEMLEAK 1
+#define MR_SERVICE_DEFAULT_CHECK_MEMLEAK 1
#endif
#ifndef MR_SERVICE_DEFAULT_PKT_TIMESTAMP
-#define MR_SERVICE_DEFAULT_PKT_TIMESTAMP 0
+#define MR_SERVICE_DEFAULT_PKT_TIMESTAMP 0
#endif
#ifndef MR_SERVICE_DEFAULT_PKT_DUMPER
-#define MR_SERVICE_DEFAULT_PKT_DUMPER 1
+#define MR_SERVICE_DEFAULT_PKT_DUMPER 1
#endif
#ifndef MR_SERVICE_DEFAULT_PKT_LATENCY
-#define MR_SERVICE_DEFAULT_PKT_LATENCY 0
+#define MR_SERVICE_DEFAULT_PKT_LATENCY 0
#endif
#ifndef MR_SERVICE_DEFAULT_PKT_LATENCY_LCORE_ID
-#define MR_SERVICE_DEFAULT_PKT_LATENCY_LCORE_ID 0
+#define MR_SERVICE_DEFAULT_PKT_LATENCY_LCORE_ID 0
#endif
unsigned int g_logger_to_stdout = 1;
@@ -398,9 +406,16 @@ static int sc_ctrlmsg_init(struct sc_main * sc)
sockaddr_in.sin_port = htons(ctrlmsg_port);
sockaddr_in.sin_family = AF_INET;
+ int __timeout = -1;
+ if (sc->en_ctrl_spinlock_check > 0) __timeout = sc->ctrl_spinlock_check_interval;
+
/* 创建消息处理框架句柄 */
- sc->ctrlmsg_handler = ctrlmsg_handler_create(CTRLMSG_HANDLER_MODE_SERVER, sockaddr_in, NULL);
- if (sc->ctrlmsg_handler == NULL) return RT_ERR;
+ sc->ctrlmsg_handler = ctrlmsg_handler_create(CTRLMSG_HANDLER_MODE_SERVER,
+ sockaddr_in, NULL, __timeout);
+
+ if (sc->ctrlmsg_handler == NULL)
+ return RT_ERR;
+
return RT_SUCCESS;
}
@@ -466,6 +481,14 @@ static int sc_g_config_init(struct sc_main * sc)
MESA_load_profile_uint_def(sc->local_cfgfile, "keepalive", "check_spinlock",
&sc->en_spinlock_check, MR_SERVICE_DEFAULT_CHECK_SPINLOCK);
+ /* 控制面线程保活 */
+ MESA_load_profile_uint_def(sc->local_cfgfile, "keepalive", "check_ctrl_spinlock",
+ &sc->en_ctrl_spinlock_check, MR_SERVICE_DEFAULT_CHECK_CTRL_SPINLOCK);
+
+ /* 控制面线程保活间隔 */
+ MESA_load_profile_uint_def(sc->local_cfgfile, "keepalive", "check_ctrl_spinlock_interval",
+ &sc->ctrl_spinlock_check_interval, MR_SERVICE_DEFAULT_CHECK_CTRL_SPINLOCK_INTERVAL);
+
/* 内存泄露检测 */
MESA_load_profile_uint_def(sc->local_cfgfile, "keepalive", "check_memleak",
&sc->en_memleak_check, MR_SERVICE_DEFAULT_CHECK_MEMLEAK);
diff --git a/tools/systemd/mrzcpd.service.in b/tools/systemd/mrzcpd.service.in
index 71d2bc4..1952b3c 100644
--- a/tools/systemd/mrzcpd.service.in
+++ b/tools/systemd/mrzcpd.service.in
@@ -15,6 +15,7 @@ ExecStopPost=/usr/bin/rm -rf /run/.dpdk
Restart=always
RestartSec=5s
+WatchdogSec=5s
Type=notify
[Install]