summaryrefslogtreecommitdiff
path: root/app
diff options
context:
space:
mode:
author童宗振 <[email protected]>2023-12-19 05:32:07 +0000
committer陆秋文 <[email protected]>2023-12-19 05:32:07 +0000
commitb3634ff0763a7c33093c9dbd7ce0e2c53b65c2b6 (patch)
tree274731c8844b8665863fb9b701557ec64b478281 /app
parent2c1c5fb4387c47e40d9cd02f2abbbfaf31ee9b38 (diff)
refactor-ctrlmsg-2
Diffstat (limited to 'app')
-rw-r--r--app/include/mrapp.h14
-rw-r--r--app/src/marsio.c506
2 files changed, 119 insertions, 401 deletions
diff --git a/app/include/mrapp.h b/app/include/mrapp.h
index 9e1a53a..3e6da2c 100644
--- a/app/include/mrapp.h
+++ b/app/include/mrapp.h
@@ -1,14 +1,14 @@
#pragma once
#include <common.h>
-#include <ctrlmsg.h>
#include <ldbc.h>
#include <marsio.h>
+#include <mr_rte_msg.h>
#include <neigh.h>
#include <pcap/pcap.h>
+#include <rte_epoll.h>
#include <tap.h>
#include <vdev_define.h>
-#include <rte_epoll.h>
struct mr_instance;
@@ -112,18 +112,10 @@ struct mr_instance
char g_cfgfile_path[MR_STRING_MAX];
/* 应用配置文件路径 */
char app_cfgfile_path[MR_STRING_MAX];
- /* 消息框架句柄 */
- struct ctrlmsg_handler * ctrlmsg_handler;
/* 虚设备实例列表 */
struct mr_vdev vdevs[MR_VDEV_MAX];
/* 虚设备实例数量 */
unsigned int nr_vdevs;
- /* 异步转同步,等待控制回复 */
- pthread_cond_t cond_ctrlmsg_wait;
- /* 等待控制回复锁 */
- pthread_mutex_t lock_ctrlmsg_wait;
- /* 控制回复到达,置1表示有控制回复需要处理 */
- unsigned int ctrlmsg_wait;
/* 邻居管理器 */
struct neighbour_manager * neigh;
/* 负载均衡器 */
@@ -132,6 +124,8 @@ struct mr_instance
struct mr_static_neigh_list static_neigh_list;
/* 统计 */
struct mrapp_stat stat[MR_SID_MAX];
+ /* 监控marsio是否存活的句柄*/
+ int marsio_fd;
/* ===== 运行选项 ===== */
diff --git a/app/src/marsio.c b/app/src/marsio.c
index 995ea33..578399e 100644
--- a/app/src/marsio.c
+++ b/app/src/marsio.c
@@ -12,10 +12,9 @@
#include <arp.h>
#include <arpa/inet.h>
#include <cJSON.h>
-#include <ctrlmsg.h>
-#include <ctrlmsg_define.h>
#include <libgen.h>
#include <marsio.h>
+#include <mr_rte_msg.h>
#include <mrapp.h>
#include <protect.h>
#include <rte_acl_osdep.h>
@@ -59,7 +58,7 @@ struct mr_instance * _current_instance = NULL;
#endif
#define MR_VDEV_BUFFER_SIZE 512
-#define MR_MP_CACHE_SIZE 256
+#define MR_MP_CACHE_SIZE 256
/* 写入Command参数 */
static void __write_arg(char * eal_argv[], unsigned int * eal_argc, unsigned int max_argc, const char * value)
@@ -145,8 +144,7 @@ static void mrapp_rx_notify_init(struct mr_instance * instance)
MESA_load_profile_uint_def(instance->g_cfgfile_path, "service", "poll_wait_throttle_notify_threshold",
&instance->zero_recv_notify_threshold, 256);
- MESA_load_profile_uint_def(instance->g_cfgfile_path, "service", "poll_wait_enable", &instance->en_notify,
- 1);
+ MESA_load_profile_uint_def(instance->g_cfgfile_path, "service", "poll_wait_enable", &instance->en_notify, 1);
MESA_load_profile_uint_def(instance->g_cfgfile_path, "service", "vdev_buffer_size", &instance->sz_vdev_buffer,
MR_VDEV_BUFFER_SIZE);
MESA_load_profile_uint_def(instance->g_cfgfile_path, "service", "mp_cache_size", &instance->sz_mp_cache,
@@ -241,305 +239,162 @@ static void mrapp_eal_init(struct mr_instance * instance)
return;
}
-static int mrapp_ctrlmsg_init(struct mr_instance * instance)
+struct app_register_req * app_register_request_construct(struct mr_instance * instance)
{
- char str_ctrlmsg_addr[MR_STRING_MAX] = {0};
- unsigned int ctrlmsg_port = 0;
+ struct app_register_req * reg_req = ZMALLOC(sizeof(struct app_register_req));
+ MR_VERIFY_MALLOC(reg_req);
- /* 读控制面监听线程的监听地址、端口号 */
- // MESA_load_profile_string_def(instance->g_cfgfile_path, "ctrlmsg", "listen_addr",
- // str_ctrlmsg_addr, sizeof(str_ctrlmsg_addr), CTRLMSG_DEFAULT_ADDR);
+ /* 应用标识符 */
+ strncpy((char *)reg_req->symbol, instance->appsym, sizeof(reg_req->symbol));
+ /* 状态监控文件,委托服务进程销毁 */
+ strncpy((char *)reg_req->mntfile, instance->monit_file_path, sizeof(reg_req->mntfile));
+ /* 进程号,便于应用跟踪,查找问题 */
+ reg_req->pid = getpid();
- const char * env_ctrlmsg_addr = getenv("MRZCPD_CTRLMSG_LISTEN_ADDR");
- if (env_ctrlmsg_addr != NULL)
+ return reg_req;
+}
+
+void * marsio_survival_monitor(void * arg)
+{
+ int epoll_fd = epoll_create1(EPOLL_CLOEXEC);
+ if (epoll_fd < 0)
{
- MR_INFO("MRZCPD_CTRLMSG_LISTEN_ADDR is %s", env_ctrlmsg_addr);
- snprintf(str_ctrlmsg_addr, sizeof(str_ctrlmsg_addr), "%s", env_ctrlmsg_addr);
+ MR_ERROR("Cannot create epoll fd: %s. ", strerror(errno));
+ return NULL;
}
- else
+ epoll_add_event(epoll_fd, _current_instance->marsio_fd, EPOLLIN);
+ struct epoll_event evlist[__EV_MAX_EVENTS];
+ while (1)
{
- MR_WARNING("MRZCPD_CTRLMSG_LISTEN_ADDR is not set, default is " CTRLMSG_DEFAULT_ADDR);
- snprintf(str_ctrlmsg_addr, sizeof(str_ctrlmsg_addr), "%s", CTRLMSG_DEFAULT_ADDR);
- }
+ int ret = epoll_wait(epoll_fd, evlist, __EV_MAX_EVENTS, -1);
+ if (ret == -1 && errno == EINTR)
+ continue;
+ else if (ret == -1)
+ {
+ MR_ERROR("loop thread failed : %s", strerror(errno));
+ return NULL;
+ }
- MESA_load_profile_uint_def(instance->g_cfgfile_path, "ctrlmsg", "listen_port", &ctrlmsg_port, CTRLMSG_DEFAULT_PORT);
+ for (int i = 0; i < ret; i++)
+ {
+ /* 异常链接|数据到达 */
+ if (evlist[i].events & (EPOLLHUP | EPOLLERR | EPOLLIN))
+ {
+ MR_INFO("marsio is terminated, Exit. ");
+ exit(EXIT_FAILURE);
+ }
+ }
+ }
+}
- /* 地址转换 */
- struct sockaddr_in sockaddr_in;
- if (inet_pton(AF_INET, str_ctrlmsg_addr, &sockaddr_in.sin_addr) <= 0)
+int send_register_request(const struct app_register_req * reg_req)
+{
+ int ret;
+ int socket_pair[2];
+ ret = socketpair(AF_UNIX, SOCK_STREAM, 0, socket_pair);
+ if (ret < 0)
{
- // MR_CFGERR_INVALID_FORMAT(instance->g_cfgfile_path, "ctrlmsg", "listen_addr");fd
- MR_ERROR("Mrapp ctrlmsg init error,The environment variable 'MRZCPD_CTRLMSG_LISTEN_ADDR=%s' is invalid.",
- str_ctrlmsg_addr);
+ MR_WARNING("Failed to create socketpair");
return RT_ERR;
}
- /* 端口 */
- sockaddr_in.sin_port = htons(ctrlmsg_port);
- sockaddr_in.sin_family = AF_INET;
-
- /* 创建消息处理框架句柄 */
- instance->ctrlmsg_handler = ctrlmsg_handler_create(CTRLMSG_HANDLER_MODE_CLIENT, sockaddr_in, NULL, -1);
- if (instance->ctrlmsg_handler == NULL)
- return RT_ERR;
- return RT_SUCCESS;
-}
-
-static int mrapp_distributer_init(struct mr_instance * instance)
-{
- unsigned int distmode = LDBC_DIST_OUTER_TUPLE2;
- unsigned int hashmode = LDBC_HASH_SYM_CRC;
+ struct rte_mp_reply mp_reply;
+ const struct timespec wait_timespec = {
+ .tv_nsec = 0,
+ .tv_sec = 30,
+ };
- MESA_load_profile_uint_def(instance->g_cfgfile_path, "service", "distmode", &distmode, LDBC_DIST_OUTER_TUPLE2);
- MESA_load_profile_uint_def(instance->g_cfgfile_path, "service", "hashmode", &hashmode, LDBC_HASH_SYM_CRC);
+ uintptr_t ptr_address = (uintptr_t)reg_req;
- if (distmode < 0 || distmode >= LDBC_DIST_MAX)
+ struct rte_mp_msg reg_msg = {};
+ strncpy(reg_msg.name, "instance_alive_register", sizeof(reg_msg.name) - 1);
+ reg_msg.fds[0] = socket_pair[0];
+ reg_msg.num_fds = 1;
+ reg_msg.len_param = sizeof(ptr_address);
+ memcpy(reg_msg.param, &ptr_address, sizeof(ptr_address));
+ ret = rte_mp_request_sync(&reg_msg, &mp_reply, &wait_timespec);
+ if (ret < 0)
{
- MR_CFGERR_INVALID_FORMAT(instance->g_cfgfile_path, "service", "distmode");
- return RT_ERR;
+ MR_WARNING("Failed to execute rte_mp_request_sync:%s", rte_strerror(rte_errno));
+ goto error;
}
- if (hashmode < 0 || distmode >= LDBC_HASH_MAX)
+ uintptr_t stored_ptr_address;
+ memcpy(&stored_ptr_address, mp_reply.msgs->param, sizeof(uintptr_t));
+ struct app_register_resp * rep_msg = (struct app_register_resp *)stored_ptr_address;
+
+ // 应用注册失败,退出。
+ if (rep_msg->errcode != 0)
{
- MR_CFGERR_INVALID_FORMAT(instance->g_cfgfile_path, "service", "hashmode");
- return RT_ERR;
+ MR_ERROR("%s", rep_msg->strerr);
+ goto error;
}
- instance->dist_object = distributer_create(distmode, hashmode, 0);
- if (instance->dist_object == NULL)
+ // Make sure the socket_pair[0] of the main process is closed and the kernel sends EOF
+ _current_instance->marsio_fd = socket_pair[1];
+ close(socket_pair[0]);
+ pthread_t __pid = 0;
+ ret = pthread_create(&__pid, NULL, marsio_survival_monitor, NULL);
+ if (ret < 0)
{
- MR_ERROR("Create distributer handler failed. ");
- return RT_ERR;
+ MR_ERROR("Launch marsio_survival_monitor thread failed : %s", strerror(errno));
+ goto error;
}
+ FREE(reg_req);
+ FREE(rep_msg);
return RT_SUCCESS;
-}
-
-#if 0
-static unsigned __table_strip(char * str, unsigned len)
-{
- int newlen = len;
- if (len == 0)
- return 0;
- if (isspace(str[len - 1]))
+error:
+ if (reg_req != NULL)
{
- /* strip trailing whitespace */
- while (newlen > 0 && isspace(str[newlen - 1]))
- str[--newlen] = '\0';
+ FREE(reg_req);
}
-
- if (isspace(str[0]))
+ if (rep_msg != NULL)
{
- /* strip leading whitespace */
- int i, start = 1;
- while (isspace(str[start]) && start < newlen)
- start++; /* do nothing */
- newlen -= start;
- for (i = 0; i < newlen; i++)
- str[i] = str[i + start];
- str[i] = '\0';
+ FREE(rep_msg);
}
-
- return newlen;
+ close(socket_pair[0]);
+ close(socket_pair[1]);
+ return RT_ERR;
}
-static const char * __table_readline(FILE * fp, char * buffer, size_t sz_buffer, size_t * buffer_len,
- unsigned int * line_no)
+// Register application
+static int mrapp_register(struct mr_instance * instance)
{
- while (fgets(buffer, sz_buffer, fp) != NULL)
- {
- char * pos = NULL;
- size_t len = strnlen(buffer, sz_buffer);
- (*line_no)++;
-
- if ((len >= sizeof(buffer) - 1) && (buffer[len - 1] != '\n'))
- {
- continue;
- }
-
- pos = memchr(buffer, '#', sz_buffer);
- if (pos != NULL)
- {
- *pos = '\0';
- len = pos - buffer;
- }
-
- len = __table_strip(buffer, len);
- if (len == 0)
- {
- continue;
- }
-
- return buffer;
- }
-
- return NULL;
+ struct app_register_req * reg_req = app_register_request_construct(instance);
+ return send_register_request(reg_req);
}
-static int __table_split_line(const char * buffer, char str_tokens[MR_TOKENS_MAX][MR_STRING_MAX])
+static int mrapp_distributer_init(struct mr_instance * instance)
{
- char * __buffer = strdup(buffer);
- assert(__buffer != NULL);
-
- char * token_ptr;
- unsigned int total_nr_tokens = 0;
-
- while ((token_ptr = strsep(&__buffer, " \t")) != NULL)
- {
- if (strlen(token_ptr) == 0)
- continue;
- if (total_nr_tokens == MR_TOKENS_MAX)
- break;
-
- strncpy(str_tokens[total_nr_tokens], token_ptr, MR_STRING_MAX);
-
- int len = strnlen(str_tokens[total_nr_tokens], MR_STRING_MAX);
- __table_strip(str_tokens[total_nr_tokens], len);
-
- total_nr_tokens++;
- }
+ unsigned int distmode = LDBC_DIST_OUTER_TUPLE2;
+ unsigned int hashmode = LDBC_HASH_SYM_CRC;
- free(__buffer);
- return total_nr_tokens;
-}
-#endif
+ MESA_load_profile_uint_def(instance->g_cfgfile_path, "service", "distmode", &distmode, LDBC_DIST_OUTER_TUPLE2);
+ MESA_load_profile_uint_def(instance->g_cfgfile_path, "service", "hashmode", &hashmode, LDBC_HASH_SYM_CRC);
-#if 0
-static int mrapp_neigh_init(struct mr_instance * instance)
-{
- instance->neigh = malloc(sizeof(struct neighbour_manager));
- MR_VERIFY_MALLOC(instance->neigh);
-
- unsigned int neigh_max_entries;
- unsigned int neigh_timeout;
- unsigned int arp_send_interval;
-
- /* 临时静态邻居表初始化
- 该表仅限初始化时使用,设备打开后,该表项读取邻居子系统。
- */
- TAILQ_INIT(&instance->static_neigh_list);
-
- MESA_load_profile_uint_def(instance->g_cfgfile_path, "neigh", "neigh_table_max_entries", &neigh_max_entries,
- MRAPP_DEFAULT_NEIGH_TABLE_MAX_ENTRIES);
- MESA_load_profile_uint_def(instance->g_cfgfile_path, "neigh", "neigh_table_timeout", &neigh_timeout,
- MRAPP_DEFAULT_NEIGH_TABLE_TIMEOUT);
- MESA_load_profile_uint_def(instance->g_cfgfile_path, "neigh", "arp_send_interval", &arp_send_interval,
- MRAPP_DEFAULT_NEIGH_ARP_SEND_INTERVAL);
- MESA_load_profile_uint_def(instance->g_cfgfile_path, "neigh", "gratuitous_arp_send",
- &instance->nr_gratuitous_arp_send, MRAPP_DEFAULT_NEIGH_GRATUITOUS_ARP_SEND);
-
- /* 邻居子系统初始化 */
- if (neighbour_mamanger_init(instance->neigh, instance->appsym, neigh_max_entries, neigh_timeout,
- arp_send_interval) != RT_SUCCESS)
+ if (distmode < 0 || distmode >= LDBC_DIST_MAX)
{
- goto err;
+ MR_CFGERR_INVALID_FORMAT(instance->g_cfgfile_path, "service", "distmode");
+ return RT_ERR;
}
- /* 读静态配置表 */
- FILE * fp_static_neigh_table = fopen(MRAPP_STATIC_NEIGH_FILE_PATH, "r");
- if (fp_static_neigh_table == NULL)
+ if (hashmode < 0 || distmode >= LDBC_HASH_MAX)
{
- MR_DEBUG("Skip reading static neighbour table from file %s.", MRAPP_STATIC_NEIGH_FILE_PATH);
- goto success;
+ MR_CFGERR_INVALID_FORMAT(instance->g_cfgfile_path, "service", "hashmode");
+ return RT_ERR;
}
- /* 按行读 */
- char __tb_line_buffer[MR_STRING_MAX];
- size_t __tb_buffer_len = 0;
- unsigned int __tb_line_no = 0;
-
- while (__table_readline(fp_static_neigh_table, __tb_line_buffer, sizeof(__tb_line_buffer), &__tb_buffer_len,
- &__tb_line_no) != NULL)
+ instance->dist_object = distributer_create(distmode, hashmode, 0);
+ if (instance->dist_object == NULL)
{
- /* 拆分,第一列为IP地址,第二列为MAC地址 */
- char __split_str[MR_TOKENS_MAX][MR_STRING_MAX];
- memset(__split_str, 0, sizeof(__split_str));
-
- int nr_tokens = __table_split_line(__tb_line_buffer, __split_str);
- if (nr_tokens != 3)
- {
- MR_WARNING("Table: %s, line: %u: Invalid line format, must have 3 column, ignore.",
- MRAPP_STATIC_NEIGH_FILE_PATH, __tb_line_no);
- continue;
- }
-
- /* 读IP地址、MAC地址 */
- const char * str_in_addr = __split_str[0];
- const char * str_ether_addr = __split_str[1];
- const char * str_device = __split_str[2];
-
- /* 字符串转换 */
- struct in_addr in_addr;
- struct rte_ether_addr ether_addr;
-
- int ret = inet_pton(AF_INET, str_in_addr, &in_addr);
- if (ret < 0)
- {
- MR_WARNING("Table: %s, line: %u: Invaild IP address format, ignore. ", MRAPP_STATIC_NEIGH_FILE_PATH,
- __tb_line_no);
- continue;
- }
-
- ret = sscanf(str_ether_addr, "%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx", &ether_addr.addr_bytes[0],
- &ether_addr.addr_bytes[1], &ether_addr.addr_bytes[2], &ether_addr.addr_bytes[3],
- &ether_addr.addr_bytes[4], &ether_addr.addr_bytes[5]);
-
- if (ret != 6)
- {
- MR_WARNING("Table: %s, line: %u: Invaild MAC address format, ignore. ", MRAPP_STATIC_NEIGH_FILE_PATH,
- __tb_line_no);
- continue;
- }
-
- /* 检查重复的表项 */
- struct mr_static_neigh_entry * __neigh_entry_iter;
- unsigned int is_dup_entry = 0;
-
- TAILQ_FOREACH(__neigh_entry_iter, &instance->static_neigh_list, next)
- {
- if (__neigh_entry_iter->in_addr.s_addr != in_addr.s_addr)
- continue;
-
- /* 重复表项,告警 */
- MR_WARNING("table: %s, line: %u: Duplicate entry, %s->%s, ignore.", MRAPP_STATIC_NEIGH_FILE_PATH,
- __tb_line_no, str_in_addr, str_ether_addr);
- is_dup_entry = 1;
- break;
- }
-
- /* 重复表项跳过 */
- if (is_dup_entry)
- continue;
-
- /* 插入临时的静态邻居表 */
- struct mr_static_neigh_entry * __neigh_entry = malloc(sizeof(struct mr_static_neigh_entry));
- memset(__neigh_entry, 0, sizeof(struct mr_static_neigh_entry));
-
- __neigh_entry->in_addr = in_addr;
- __neigh_entry->ether_addr = ether_addr;
- strncpy(__neigh_entry->devsym, str_device, sizeof(__neigh_entry->devsym));
-
- /* 拷贝IP地址、MAC地址的字符串形式,便于后面显示日志信息 */
- strncpy(__neigh_entry->str_in_addr, str_in_addr, sizeof(__neigh_entry->str_in_addr));
- strncpy(__neigh_entry->str_ether_addr, str_ether_addr, sizeof(__neigh_entry->str_ether_addr));
-
- TAILQ_INSERT_TAIL(&instance->static_neigh_list, __neigh_entry, next);
+ MR_ERROR("Create distributer handler failed. ");
+ return RT_ERR;
}
-success:
return RT_SUCCESS;
-
-err:
- if (instance->neigh != NULL)
- {
- free(instance->neigh);
- instance->neigh = NULL;
- }
-
- return RT_ERR;
}
-#endif
static void mp_cache_init_for_each_mp(struct rte_mempool * mp, void * arg)
{
@@ -576,35 +431,6 @@ static void mpapp_mp_cache_init(struct mr_instance * instance)
rte_mempool_walk(mp_cache_init_for_each_mp, (void *)instance);
}
-/* 注册应用 */
-static int mrapp_app_register(struct mr_instance * instance)
-{
- struct ctrl_msg_app_reg_request reg_cmd;
- memset(&reg_cmd, 0, sizeof(reg_cmd));
-
- ctrl_msg_header_construct(&reg_cmd.msg_header, sizeof(reg_cmd), CTRL_MSG_TYPE_REQUEST, CTRLMSG_TOPIC_APP_REGISTER);
-
- /* 应用标识符 */
- strncpy((char *)reg_cmd.symbol, instance->appsym, sizeof(reg_cmd.symbol));
- /* 状态监控文件,委托服务进程销毁 */
- strncpy((char *)reg_cmd.mntfile, instance->monit_file_path, sizeof(reg_cmd.mntfile));
- /* 进程号,便于应用跟踪,查找问题 */
- reg_cmd.pid = getpid();
-
- ctrlmsg_msg_send(instance->ctrlmsg_handler, NULL, (struct ctrl_msg_header *)(&reg_cmd));
-
- /* TODO: 抽象出单独的函数 */
- pthread_mutex_lock(&instance->lock_ctrlmsg_wait);
- while (instance->ctrlmsg_wait == 0)
- {
- pthread_cond_wait(&instance->cond_ctrlmsg_wait, &instance->lock_ctrlmsg_wait);
- }
-
- instance->ctrlmsg_wait = 0;
- pthread_mutex_unlock(&instance->lock_ctrlmsg_wait);
- return RT_SUCCESS;
-}
-
/* 读全局配置文件路径等信息 */
static int mrapp_gconf_init(struct mr_instance * instance)
{
@@ -684,75 +510,6 @@ struct mr_vdev * marsio_device_lookup(struct mr_instance * instance, const char
return NULL;
}
-static int __ctrlplane_conn_close_handler(struct ctrlmsg_handler * ct_hand, struct ctrlmsg_conn * ct_conn,
- struct ctrl_msg_header * msg, void * arg)
-{
- MR_INFO("Ctrlplane connection is terminated, Exit. ");
- exit(EXIT_FAILURE);
-}
-
-#if 0
-static int __open_device_response_handler(struct ctrlmsg_handler * ct_hand,
- struct ctrlmsg_conn * ct_conn, struct ctrl_msg_header * msg, void * arg)
-{
- struct ctrl_msg_vdev_open_response * rep_msg = (struct ctrl_msg_vdev_open_response *)msg;
- struct mr_instance * instance = (struct mr_instance *)arg;
-
- /* 打开失败 */
- if (rep_msg->msg_err.errcode != 0)
- {
- MR_ERROR("%s", rep_msg->msg_err.strerr);
- goto wake_up;
- }
-
- /* 打开成功 */
- struct mr_vdev * mr_vdev = &instance->vdevs[instance->nr_vdevs];
-
- mr_vdev->vdi = (struct vdev_instance *)rep_msg->ptr_vdi;
- mr_vdev->nr_rxstream = rep_msg->nr_rxstream;
- mr_vdev->nr_txstream = rep_msg->nr_txstream;
- mr_vdev->instance = instance;
- strncpy(mr_vdev->devsym, (char *)rep_msg->devsym, MR_SYMBOL_MAX);
-
- /* VDI使用了大页面共享内存,在保护模式下撤除对它的保护 */
- if (instance->memory_protect_with_asan)
- {
- __open_device_unposion(mr_vdev->vdi);
- }
-
- instance->nr_vdevs++;
-
-wake_up:
- pthread_mutex_lock(&instance->lock_ctrlmsg_wait);
- instance->ctrlmsg_wait = 1;
- pthread_cond_broadcast(&instance->cond_ctrlmsg_wait);
- pthread_mutex_unlock(&instance->lock_ctrlmsg_wait);
- return 0;
-}
-
-#endif
-
-static int __app_register_response_handler(struct ctrlmsg_handler * ct_hand, struct ctrlmsg_conn * ct_conn,
- struct ctrl_msg_header * msg, void * arg)
-{
- struct ctrl_msg_app_reg_response * rep_msg = (struct ctrl_msg_app_reg_response *)msg;
- struct mr_instance * instance = (struct mr_instance *)arg;
-
- // 应用注册失败,退出。
- if (rep_msg->msg_err.errcode != 0)
- {
- MR_ERROR("%s", rep_msg->msg_err.strerr);
- exit(EXIT_FAILURE);
- }
-
- // 注册成功,唤醒等待的线程。
- pthread_mutex_lock(&instance->lock_ctrlmsg_wait);
- instance->ctrlmsg_wait = 1;
- pthread_cond_broadcast(&instance->cond_ctrlmsg_wait);
- pthread_mutex_unlock(&instance->lock_ctrlmsg_wait);
- return 0;
-}
-
struct mr_vdev * marsio_open_device(struct mr_instance * instance, const char * devsym, unsigned int nr_rxstream,
unsigned int nr_txstream)
{
@@ -969,10 +726,6 @@ int marsio_init(struct mr_instance * instance, const char * appsym)
/* 状态监测路径 */
snprintf(instance->monit_file_path, sizeof(instance->monit_file_path), MRAPP_MONIT_FILE_PATH, instance->appsym);
- pthread_mutex_init(&instance->lock_ctrlmsg_wait, NULL);
- pthread_cond_init(&instance->cond_ctrlmsg_wait, NULL);
- pthread_mutex_init(&instance->lock_thread_init, NULL);
-
/* 根据CPU_MASK计算线程数 */
if (instance->nr_dataplane_thread == 0 && CPU_COUNT(&instance->cpu_set) != 0)
{
@@ -985,45 +738,16 @@ int marsio_init(struct mr_instance * instance, const char * appsym)
goto err;
}
- /* 初始化消息通信框架
- 在EAL环境启动之前启动消息通信框架,避免SERVICE启动之前启动APP
- */
- if (mrapp_ctrlmsg_init(instance) != RT_SUCCESS)
- {
- MR_ERROR("Ctrlmsg module initialization failed, recheck mrzcpd is running.");
- goto err;
- }
-
- /* 初始化EAL环境 */
mrapp_eal_init(instance);
- mrapp_rx_notify_init(instance);
-
- /* 注册处理应用注册结果的回调函数 */
- ctrlmsg_msg_reciver_register(instance->ctrlmsg_handler, CTRLMSG_TOPIC_APP_REGISTER, CTRL_MSG_TYPE_RESPONSE,
- __app_register_response_handler, instance);
-
- /* 注册设备打开的回调函数 */
-
-#if 0
- ctrlmsg_msg_reciver_register(instance->ctrlmsg_handler, CTRLMSG_TOPIC_VDEV_OPEN,
- CTRL_MSG_TYPE_RESPONSE, __open_device_response_handler, instance);
-#endif
- /* 控制链接中断处理函数 */
- ctrlmsg_event_conn_close_register(instance->ctrlmsg_handler, __ctrlplane_conn_close_handler, instance);
-
- if (ctrlmsg_thread_launch(instance->ctrlmsg_handler) != RT_SUCCESS)
+ if (mrapp_register(instance) != RT_SUCCESS)
{
- MR_ERROR("Launch ctrlmsg thread failed. ");
+ MR_ERROR("app register failed, recheck mrzcpd is running.");
goto err;
}
- /* 应用注册 */
- if (mrapp_app_register(instance) != RT_SUCCESS)
- {
- MR_ERROR("App register failed. ");
- goto err;
- }
+ /* 初始化EAL环境 */
+ mrapp_rx_notify_init(instance);
/* 负载均衡器 */
if (mrapp_distributer_init(instance) != RT_SUCCESS)