diff options
| author | 童宗振 <[email protected]> | 2023-12-19 05:32:07 +0000 |
|---|---|---|
| committer | 陆秋文 <[email protected]> | 2023-12-19 05:32:07 +0000 |
| commit | b3634ff0763a7c33093c9dbd7ce0e2c53b65c2b6 (patch) | |
| tree | 274731c8844b8665863fb9b701557ec64b478281 /app | |
| parent | 2c1c5fb4387c47e40d9cd02f2abbbfaf31ee9b38 (diff) | |
refactor-ctrlmsg-2
Diffstat (limited to 'app')
| -rw-r--r-- | app/include/mrapp.h | 14 | ||||
| -rw-r--r-- | app/src/marsio.c | 506 |
2 files changed, 119 insertions, 401 deletions
diff --git a/app/include/mrapp.h b/app/include/mrapp.h index 9e1a53a..3e6da2c 100644 --- a/app/include/mrapp.h +++ b/app/include/mrapp.h @@ -1,14 +1,14 @@ #pragma once #include <common.h> -#include <ctrlmsg.h> #include <ldbc.h> #include <marsio.h> +#include <mr_rte_msg.h> #include <neigh.h> #include <pcap/pcap.h> +#include <rte_epoll.h> #include <tap.h> #include <vdev_define.h> -#include <rte_epoll.h> struct mr_instance; @@ -112,18 +112,10 @@ struct mr_instance char g_cfgfile_path[MR_STRING_MAX]; /* 应用配置文件路径 */ char app_cfgfile_path[MR_STRING_MAX]; - /* 消息框架句柄 */ - struct ctrlmsg_handler * ctrlmsg_handler; /* 虚设备实例列表 */ struct mr_vdev vdevs[MR_VDEV_MAX]; /* 虚设备实例数量 */ unsigned int nr_vdevs; - /* 异步转同步,等待控制回复 */ - pthread_cond_t cond_ctrlmsg_wait; - /* 等待控制回复锁 */ - pthread_mutex_t lock_ctrlmsg_wait; - /* 控制回复到达,置1表示有控制回复需要处理 */ - unsigned int ctrlmsg_wait; /* 邻居管理器 */ struct neighbour_manager * neigh; /* 负载均衡器 */ @@ -132,6 +124,8 @@ struct mr_instance struct mr_static_neigh_list static_neigh_list; /* 统计 */ struct mrapp_stat stat[MR_SID_MAX]; + /* 监控marsio是否存活的句柄*/ + int marsio_fd; /* ===== 运行选项 ===== */ diff --git a/app/src/marsio.c b/app/src/marsio.c index 995ea33..578399e 100644 --- a/app/src/marsio.c +++ b/app/src/marsio.c @@ -12,10 +12,9 @@ #include <arp.h> #include <arpa/inet.h> #include <cJSON.h> -#include <ctrlmsg.h> -#include <ctrlmsg_define.h> #include <libgen.h> #include <marsio.h> +#include <mr_rte_msg.h> #include <mrapp.h> #include <protect.h> #include <rte_acl_osdep.h> @@ -59,7 +58,7 @@ struct mr_instance * _current_instance = NULL; #endif #define MR_VDEV_BUFFER_SIZE 512 -#define MR_MP_CACHE_SIZE 256 +#define MR_MP_CACHE_SIZE 256 /* 写入Command参数 */ static void __write_arg(char * eal_argv[], unsigned int * eal_argc, unsigned int max_argc, const char * value) @@ -145,8 +144,7 @@ static void mrapp_rx_notify_init(struct mr_instance * instance) MESA_load_profile_uint_def(instance->g_cfgfile_path, "service", "poll_wait_throttle_notify_threshold", &instance->zero_recv_notify_threshold, 256); - MESA_load_profile_uint_def(instance->g_cfgfile_path, "service", "poll_wait_enable", &instance->en_notify, - 1); + MESA_load_profile_uint_def(instance->g_cfgfile_path, "service", "poll_wait_enable", &instance->en_notify, 1); MESA_load_profile_uint_def(instance->g_cfgfile_path, "service", "vdev_buffer_size", &instance->sz_vdev_buffer, MR_VDEV_BUFFER_SIZE); MESA_load_profile_uint_def(instance->g_cfgfile_path, "service", "mp_cache_size", &instance->sz_mp_cache, @@ -241,305 +239,162 @@ static void mrapp_eal_init(struct mr_instance * instance) return; } -static int mrapp_ctrlmsg_init(struct mr_instance * instance) +struct app_register_req * app_register_request_construct(struct mr_instance * instance) { - char str_ctrlmsg_addr[MR_STRING_MAX] = {0}; - unsigned int ctrlmsg_port = 0; + struct app_register_req * reg_req = ZMALLOC(sizeof(struct app_register_req)); + MR_VERIFY_MALLOC(reg_req); - /* 读控制面监听线程的监听地址、端口号 */ - // MESA_load_profile_string_def(instance->g_cfgfile_path, "ctrlmsg", "listen_addr", - // str_ctrlmsg_addr, sizeof(str_ctrlmsg_addr), CTRLMSG_DEFAULT_ADDR); + /* 应用标识符 */ + strncpy((char *)reg_req->symbol, instance->appsym, sizeof(reg_req->symbol)); + /* 状态监控文件,委托服务进程销毁 */ + strncpy((char *)reg_req->mntfile, instance->monit_file_path, sizeof(reg_req->mntfile)); + /* 进程号,便于应用跟踪,查找问题 */ + reg_req->pid = getpid(); - const char * env_ctrlmsg_addr = getenv("MRZCPD_CTRLMSG_LISTEN_ADDR"); - if (env_ctrlmsg_addr != NULL) + return reg_req; +} + +void * marsio_survival_monitor(void * arg) +{ + int epoll_fd = epoll_create1(EPOLL_CLOEXEC); + if (epoll_fd < 0) { - MR_INFO("MRZCPD_CTRLMSG_LISTEN_ADDR is %s", env_ctrlmsg_addr); - snprintf(str_ctrlmsg_addr, sizeof(str_ctrlmsg_addr), "%s", env_ctrlmsg_addr); + MR_ERROR("Cannot create epoll fd: %s. ", strerror(errno)); + return NULL; } - else + epoll_add_event(epoll_fd, _current_instance->marsio_fd, EPOLLIN); + struct epoll_event evlist[__EV_MAX_EVENTS]; + while (1) { - MR_WARNING("MRZCPD_CTRLMSG_LISTEN_ADDR is not set, default is " CTRLMSG_DEFAULT_ADDR); - snprintf(str_ctrlmsg_addr, sizeof(str_ctrlmsg_addr), "%s", CTRLMSG_DEFAULT_ADDR); - } + int ret = epoll_wait(epoll_fd, evlist, __EV_MAX_EVENTS, -1); + if (ret == -1 && errno == EINTR) + continue; + else if (ret == -1) + { + MR_ERROR("loop thread failed : %s", strerror(errno)); + return NULL; + } - MESA_load_profile_uint_def(instance->g_cfgfile_path, "ctrlmsg", "listen_port", &ctrlmsg_port, CTRLMSG_DEFAULT_PORT); + for (int i = 0; i < ret; i++) + { + /* 异常链接|数据到达 */ + if (evlist[i].events & (EPOLLHUP | EPOLLERR | EPOLLIN)) + { + MR_INFO("marsio is terminated, Exit. "); + exit(EXIT_FAILURE); + } + } + } +} - /* 地址转换 */ - struct sockaddr_in sockaddr_in; - if (inet_pton(AF_INET, str_ctrlmsg_addr, &sockaddr_in.sin_addr) <= 0) +int send_register_request(const struct app_register_req * reg_req) +{ + int ret; + int socket_pair[2]; + ret = socketpair(AF_UNIX, SOCK_STREAM, 0, socket_pair); + if (ret < 0) { - // MR_CFGERR_INVALID_FORMAT(instance->g_cfgfile_path, "ctrlmsg", "listen_addr");fd - MR_ERROR("Mrapp ctrlmsg init error,The environment variable 'MRZCPD_CTRLMSG_LISTEN_ADDR=%s' is invalid.", - str_ctrlmsg_addr); + MR_WARNING("Failed to create socketpair"); return RT_ERR; } - /* 端口 */ - sockaddr_in.sin_port = htons(ctrlmsg_port); - sockaddr_in.sin_family = AF_INET; - - /* 创建消息处理框架句柄 */ - instance->ctrlmsg_handler = ctrlmsg_handler_create(CTRLMSG_HANDLER_MODE_CLIENT, sockaddr_in, NULL, -1); - if (instance->ctrlmsg_handler == NULL) - return RT_ERR; - return RT_SUCCESS; -} - -static int mrapp_distributer_init(struct mr_instance * instance) -{ - unsigned int distmode = LDBC_DIST_OUTER_TUPLE2; - unsigned int hashmode = LDBC_HASH_SYM_CRC; + struct rte_mp_reply mp_reply; + const struct timespec wait_timespec = { + .tv_nsec = 0, + .tv_sec = 30, + }; - MESA_load_profile_uint_def(instance->g_cfgfile_path, "service", "distmode", &distmode, LDBC_DIST_OUTER_TUPLE2); - MESA_load_profile_uint_def(instance->g_cfgfile_path, "service", "hashmode", &hashmode, LDBC_HASH_SYM_CRC); + uintptr_t ptr_address = (uintptr_t)reg_req; - if (distmode < 0 || distmode >= LDBC_DIST_MAX) + struct rte_mp_msg reg_msg = {}; + strncpy(reg_msg.name, "instance_alive_register", sizeof(reg_msg.name) - 1); + reg_msg.fds[0] = socket_pair[0]; + reg_msg.num_fds = 1; + reg_msg.len_param = sizeof(ptr_address); + memcpy(reg_msg.param, &ptr_address, sizeof(ptr_address)); + ret = rte_mp_request_sync(®_msg, &mp_reply, &wait_timespec); + if (ret < 0) { - MR_CFGERR_INVALID_FORMAT(instance->g_cfgfile_path, "service", "distmode"); - return RT_ERR; + MR_WARNING("Failed to execute rte_mp_request_sync:%s", rte_strerror(rte_errno)); + goto error; } - if (hashmode < 0 || distmode >= LDBC_HASH_MAX) + uintptr_t stored_ptr_address; + memcpy(&stored_ptr_address, mp_reply.msgs->param, sizeof(uintptr_t)); + struct app_register_resp * rep_msg = (struct app_register_resp *)stored_ptr_address; + + // 应用注册失败,退出。 + if (rep_msg->errcode != 0) { - MR_CFGERR_INVALID_FORMAT(instance->g_cfgfile_path, "service", "hashmode"); - return RT_ERR; + MR_ERROR("%s", rep_msg->strerr); + goto error; } - instance->dist_object = distributer_create(distmode, hashmode, 0); - if (instance->dist_object == NULL) + // Make sure the socket_pair[0] of the main process is closed and the kernel sends EOF + _current_instance->marsio_fd = socket_pair[1]; + close(socket_pair[0]); + pthread_t __pid = 0; + ret = pthread_create(&__pid, NULL, marsio_survival_monitor, NULL); + if (ret < 0) { - MR_ERROR("Create distributer handler failed. "); - return RT_ERR; + MR_ERROR("Launch marsio_survival_monitor thread failed : %s", strerror(errno)); + goto error; } + FREE(reg_req); + FREE(rep_msg); return RT_SUCCESS; -} - -#if 0 -static unsigned __table_strip(char * str, unsigned len) -{ - int newlen = len; - if (len == 0) - return 0; - if (isspace(str[len - 1])) +error: + if (reg_req != NULL) { - /* strip trailing whitespace */ - while (newlen > 0 && isspace(str[newlen - 1])) - str[--newlen] = '\0'; + FREE(reg_req); } - - if (isspace(str[0])) + if (rep_msg != NULL) { - /* strip leading whitespace */ - int i, start = 1; - while (isspace(str[start]) && start < newlen) - start++; /* do nothing */ - newlen -= start; - for (i = 0; i < newlen; i++) - str[i] = str[i + start]; - str[i] = '\0'; + FREE(rep_msg); } - - return newlen; + close(socket_pair[0]); + close(socket_pair[1]); + return RT_ERR; } -static const char * __table_readline(FILE * fp, char * buffer, size_t sz_buffer, size_t * buffer_len, - unsigned int * line_no) +// Register application +static int mrapp_register(struct mr_instance * instance) { - while (fgets(buffer, sz_buffer, fp) != NULL) - { - char * pos = NULL; - size_t len = strnlen(buffer, sz_buffer); - (*line_no)++; - - if ((len >= sizeof(buffer) - 1) && (buffer[len - 1] != '\n')) - { - continue; - } - - pos = memchr(buffer, '#', sz_buffer); - if (pos != NULL) - { - *pos = '\0'; - len = pos - buffer; - } - - len = __table_strip(buffer, len); - if (len == 0) - { - continue; - } - - return buffer; - } - - return NULL; + struct app_register_req * reg_req = app_register_request_construct(instance); + return send_register_request(reg_req); } -static int __table_split_line(const char * buffer, char str_tokens[MR_TOKENS_MAX][MR_STRING_MAX]) +static int mrapp_distributer_init(struct mr_instance * instance) { - char * __buffer = strdup(buffer); - assert(__buffer != NULL); - - char * token_ptr; - unsigned int total_nr_tokens = 0; - - while ((token_ptr = strsep(&__buffer, " \t")) != NULL) - { - if (strlen(token_ptr) == 0) - continue; - if (total_nr_tokens == MR_TOKENS_MAX) - break; - - strncpy(str_tokens[total_nr_tokens], token_ptr, MR_STRING_MAX); - - int len = strnlen(str_tokens[total_nr_tokens], MR_STRING_MAX); - __table_strip(str_tokens[total_nr_tokens], len); - - total_nr_tokens++; - } + unsigned int distmode = LDBC_DIST_OUTER_TUPLE2; + unsigned int hashmode = LDBC_HASH_SYM_CRC; - free(__buffer); - return total_nr_tokens; -} -#endif + MESA_load_profile_uint_def(instance->g_cfgfile_path, "service", "distmode", &distmode, LDBC_DIST_OUTER_TUPLE2); + MESA_load_profile_uint_def(instance->g_cfgfile_path, "service", "hashmode", &hashmode, LDBC_HASH_SYM_CRC); -#if 0 -static int mrapp_neigh_init(struct mr_instance * instance) -{ - instance->neigh = malloc(sizeof(struct neighbour_manager)); - MR_VERIFY_MALLOC(instance->neigh); - - unsigned int neigh_max_entries; - unsigned int neigh_timeout; - unsigned int arp_send_interval; - - /* 临时静态邻居表初始化 - 该表仅限初始化时使用,设备打开后,该表项读取邻居子系统。 - */ - TAILQ_INIT(&instance->static_neigh_list); - - MESA_load_profile_uint_def(instance->g_cfgfile_path, "neigh", "neigh_table_max_entries", &neigh_max_entries, - MRAPP_DEFAULT_NEIGH_TABLE_MAX_ENTRIES); - MESA_load_profile_uint_def(instance->g_cfgfile_path, "neigh", "neigh_table_timeout", &neigh_timeout, - MRAPP_DEFAULT_NEIGH_TABLE_TIMEOUT); - MESA_load_profile_uint_def(instance->g_cfgfile_path, "neigh", "arp_send_interval", &arp_send_interval, - MRAPP_DEFAULT_NEIGH_ARP_SEND_INTERVAL); - MESA_load_profile_uint_def(instance->g_cfgfile_path, "neigh", "gratuitous_arp_send", - &instance->nr_gratuitous_arp_send, MRAPP_DEFAULT_NEIGH_GRATUITOUS_ARP_SEND); - - /* 邻居子系统初始化 */ - if (neighbour_mamanger_init(instance->neigh, instance->appsym, neigh_max_entries, neigh_timeout, - arp_send_interval) != RT_SUCCESS) + if (distmode < 0 || distmode >= LDBC_DIST_MAX) { - goto err; + MR_CFGERR_INVALID_FORMAT(instance->g_cfgfile_path, "service", "distmode"); + return RT_ERR; } - /* 读静态配置表 */ - FILE * fp_static_neigh_table = fopen(MRAPP_STATIC_NEIGH_FILE_PATH, "r"); - if (fp_static_neigh_table == NULL) + if (hashmode < 0 || distmode >= LDBC_HASH_MAX) { - MR_DEBUG("Skip reading static neighbour table from file %s.", MRAPP_STATIC_NEIGH_FILE_PATH); - goto success; + MR_CFGERR_INVALID_FORMAT(instance->g_cfgfile_path, "service", "hashmode"); + return RT_ERR; } - /* 按行读 */ - char __tb_line_buffer[MR_STRING_MAX]; - size_t __tb_buffer_len = 0; - unsigned int __tb_line_no = 0; - - while (__table_readline(fp_static_neigh_table, __tb_line_buffer, sizeof(__tb_line_buffer), &__tb_buffer_len, - &__tb_line_no) != NULL) + instance->dist_object = distributer_create(distmode, hashmode, 0); + if (instance->dist_object == NULL) { - /* 拆分,第一列为IP地址,第二列为MAC地址 */ - char __split_str[MR_TOKENS_MAX][MR_STRING_MAX]; - memset(__split_str, 0, sizeof(__split_str)); - - int nr_tokens = __table_split_line(__tb_line_buffer, __split_str); - if (nr_tokens != 3) - { - MR_WARNING("Table: %s, line: %u: Invalid line format, must have 3 column, ignore.", - MRAPP_STATIC_NEIGH_FILE_PATH, __tb_line_no); - continue; - } - - /* 读IP地址、MAC地址 */ - const char * str_in_addr = __split_str[0]; - const char * str_ether_addr = __split_str[1]; - const char * str_device = __split_str[2]; - - /* 字符串转换 */ - struct in_addr in_addr; - struct rte_ether_addr ether_addr; - - int ret = inet_pton(AF_INET, str_in_addr, &in_addr); - if (ret < 0) - { - MR_WARNING("Table: %s, line: %u: Invaild IP address format, ignore. ", MRAPP_STATIC_NEIGH_FILE_PATH, - __tb_line_no); - continue; - } - - ret = sscanf(str_ether_addr, "%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx", ðer_addr.addr_bytes[0], - ðer_addr.addr_bytes[1], ðer_addr.addr_bytes[2], ðer_addr.addr_bytes[3], - ðer_addr.addr_bytes[4], ðer_addr.addr_bytes[5]); - - if (ret != 6) - { - MR_WARNING("Table: %s, line: %u: Invaild MAC address format, ignore. ", MRAPP_STATIC_NEIGH_FILE_PATH, - __tb_line_no); - continue; - } - - /* 检查重复的表项 */ - struct mr_static_neigh_entry * __neigh_entry_iter; - unsigned int is_dup_entry = 0; - - TAILQ_FOREACH(__neigh_entry_iter, &instance->static_neigh_list, next) - { - if (__neigh_entry_iter->in_addr.s_addr != in_addr.s_addr) - continue; - - /* 重复表项,告警 */ - MR_WARNING("table: %s, line: %u: Duplicate entry, %s->%s, ignore.", MRAPP_STATIC_NEIGH_FILE_PATH, - __tb_line_no, str_in_addr, str_ether_addr); - is_dup_entry = 1; - break; - } - - /* 重复表项跳过 */ - if (is_dup_entry) - continue; - - /* 插入临时的静态邻居表 */ - struct mr_static_neigh_entry * __neigh_entry = malloc(sizeof(struct mr_static_neigh_entry)); - memset(__neigh_entry, 0, sizeof(struct mr_static_neigh_entry)); - - __neigh_entry->in_addr = in_addr; - __neigh_entry->ether_addr = ether_addr; - strncpy(__neigh_entry->devsym, str_device, sizeof(__neigh_entry->devsym)); - - /* 拷贝IP地址、MAC地址的字符串形式,便于后面显示日志信息 */ - strncpy(__neigh_entry->str_in_addr, str_in_addr, sizeof(__neigh_entry->str_in_addr)); - strncpy(__neigh_entry->str_ether_addr, str_ether_addr, sizeof(__neigh_entry->str_ether_addr)); - - TAILQ_INSERT_TAIL(&instance->static_neigh_list, __neigh_entry, next); + MR_ERROR("Create distributer handler failed. "); + return RT_ERR; } -success: return RT_SUCCESS; - -err: - if (instance->neigh != NULL) - { - free(instance->neigh); - instance->neigh = NULL; - } - - return RT_ERR; } -#endif static void mp_cache_init_for_each_mp(struct rte_mempool * mp, void * arg) { @@ -576,35 +431,6 @@ static void mpapp_mp_cache_init(struct mr_instance * instance) rte_mempool_walk(mp_cache_init_for_each_mp, (void *)instance); } -/* 注册应用 */ -static int mrapp_app_register(struct mr_instance * instance) -{ - struct ctrl_msg_app_reg_request reg_cmd; - memset(®_cmd, 0, sizeof(reg_cmd)); - - ctrl_msg_header_construct(®_cmd.msg_header, sizeof(reg_cmd), CTRL_MSG_TYPE_REQUEST, CTRLMSG_TOPIC_APP_REGISTER); - - /* 应用标识符 */ - strncpy((char *)reg_cmd.symbol, instance->appsym, sizeof(reg_cmd.symbol)); - /* 状态监控文件,委托服务进程销毁 */ - strncpy((char *)reg_cmd.mntfile, instance->monit_file_path, sizeof(reg_cmd.mntfile)); - /* 进程号,便于应用跟踪,查找问题 */ - reg_cmd.pid = getpid(); - - ctrlmsg_msg_send(instance->ctrlmsg_handler, NULL, (struct ctrl_msg_header *)(®_cmd)); - - /* TODO: 抽象出单独的函数 */ - pthread_mutex_lock(&instance->lock_ctrlmsg_wait); - while (instance->ctrlmsg_wait == 0) - { - pthread_cond_wait(&instance->cond_ctrlmsg_wait, &instance->lock_ctrlmsg_wait); - } - - instance->ctrlmsg_wait = 0; - pthread_mutex_unlock(&instance->lock_ctrlmsg_wait); - return RT_SUCCESS; -} - /* 读全局配置文件路径等信息 */ static int mrapp_gconf_init(struct mr_instance * instance) { @@ -684,75 +510,6 @@ struct mr_vdev * marsio_device_lookup(struct mr_instance * instance, const char return NULL; } -static int __ctrlplane_conn_close_handler(struct ctrlmsg_handler * ct_hand, struct ctrlmsg_conn * ct_conn, - struct ctrl_msg_header * msg, void * arg) -{ - MR_INFO("Ctrlplane connection is terminated, Exit. "); - exit(EXIT_FAILURE); -} - -#if 0 -static int __open_device_response_handler(struct ctrlmsg_handler * ct_hand, - struct ctrlmsg_conn * ct_conn, struct ctrl_msg_header * msg, void * arg) -{ - struct ctrl_msg_vdev_open_response * rep_msg = (struct ctrl_msg_vdev_open_response *)msg; - struct mr_instance * instance = (struct mr_instance *)arg; - - /* 打开失败 */ - if (rep_msg->msg_err.errcode != 0) - { - MR_ERROR("%s", rep_msg->msg_err.strerr); - goto wake_up; - } - - /* 打开成功 */ - struct mr_vdev * mr_vdev = &instance->vdevs[instance->nr_vdevs]; - - mr_vdev->vdi = (struct vdev_instance *)rep_msg->ptr_vdi; - mr_vdev->nr_rxstream = rep_msg->nr_rxstream; - mr_vdev->nr_txstream = rep_msg->nr_txstream; - mr_vdev->instance = instance; - strncpy(mr_vdev->devsym, (char *)rep_msg->devsym, MR_SYMBOL_MAX); - - /* VDI使用了大页面共享内存,在保护模式下撤除对它的保护 */ - if (instance->memory_protect_with_asan) - { - __open_device_unposion(mr_vdev->vdi); - } - - instance->nr_vdevs++; - -wake_up: - pthread_mutex_lock(&instance->lock_ctrlmsg_wait); - instance->ctrlmsg_wait = 1; - pthread_cond_broadcast(&instance->cond_ctrlmsg_wait); - pthread_mutex_unlock(&instance->lock_ctrlmsg_wait); - return 0; -} - -#endif - -static int __app_register_response_handler(struct ctrlmsg_handler * ct_hand, struct ctrlmsg_conn * ct_conn, - struct ctrl_msg_header * msg, void * arg) -{ - struct ctrl_msg_app_reg_response * rep_msg = (struct ctrl_msg_app_reg_response *)msg; - struct mr_instance * instance = (struct mr_instance *)arg; - - // 应用注册失败,退出。 - if (rep_msg->msg_err.errcode != 0) - { - MR_ERROR("%s", rep_msg->msg_err.strerr); - exit(EXIT_FAILURE); - } - - // 注册成功,唤醒等待的线程。 - pthread_mutex_lock(&instance->lock_ctrlmsg_wait); - instance->ctrlmsg_wait = 1; - pthread_cond_broadcast(&instance->cond_ctrlmsg_wait); - pthread_mutex_unlock(&instance->lock_ctrlmsg_wait); - return 0; -} - struct mr_vdev * marsio_open_device(struct mr_instance * instance, const char * devsym, unsigned int nr_rxstream, unsigned int nr_txstream) { @@ -969,10 +726,6 @@ int marsio_init(struct mr_instance * instance, const char * appsym) /* 状态监测路径 */ snprintf(instance->monit_file_path, sizeof(instance->monit_file_path), MRAPP_MONIT_FILE_PATH, instance->appsym); - pthread_mutex_init(&instance->lock_ctrlmsg_wait, NULL); - pthread_cond_init(&instance->cond_ctrlmsg_wait, NULL); - pthread_mutex_init(&instance->lock_thread_init, NULL); - /* 根据CPU_MASK计算线程数 */ if (instance->nr_dataplane_thread == 0 && CPU_COUNT(&instance->cpu_set) != 0) { @@ -985,45 +738,16 @@ int marsio_init(struct mr_instance * instance, const char * appsym) goto err; } - /* 初始化消息通信框架 - 在EAL环境启动之前启动消息通信框架,避免SERVICE启动之前启动APP - */ - if (mrapp_ctrlmsg_init(instance) != RT_SUCCESS) - { - MR_ERROR("Ctrlmsg module initialization failed, recheck mrzcpd is running."); - goto err; - } - - /* 初始化EAL环境 */ mrapp_eal_init(instance); - mrapp_rx_notify_init(instance); - - /* 注册处理应用注册结果的回调函数 */ - ctrlmsg_msg_reciver_register(instance->ctrlmsg_handler, CTRLMSG_TOPIC_APP_REGISTER, CTRL_MSG_TYPE_RESPONSE, - __app_register_response_handler, instance); - - /* 注册设备打开的回调函数 */ - -#if 0 - ctrlmsg_msg_reciver_register(instance->ctrlmsg_handler, CTRLMSG_TOPIC_VDEV_OPEN, - CTRL_MSG_TYPE_RESPONSE, __open_device_response_handler, instance); -#endif - /* 控制链接中断处理函数 */ - ctrlmsg_event_conn_close_register(instance->ctrlmsg_handler, __ctrlplane_conn_close_handler, instance); - - if (ctrlmsg_thread_launch(instance->ctrlmsg_handler) != RT_SUCCESS) + if (mrapp_register(instance) != RT_SUCCESS) { - MR_ERROR("Launch ctrlmsg thread failed. "); + MR_ERROR("app register failed, recheck mrzcpd is running."); goto err; } - /* 应用注册 */ - if (mrapp_app_register(instance) != RT_SUCCESS) - { - MR_ERROR("App register failed. "); - goto err; - } + /* 初始化EAL环境 */ + mrapp_rx_notify_init(instance); /* 负载均衡器 */ if (mrapp_distributer_init(instance) != RT_SUCCESS) |
