diff options
| -rw-r--r-- | app/include/mrapp.h | 18 | ||||
| -rw-r--r-- | app/src/marsio.c | 1 | ||||
| -rw-r--r-- | app/src/monit.c | 4 | ||||
| -rw-r--r-- | app/src/rawio.c | 63 | ||||
| -rw-r--r-- | app/src/version.map | 1 | ||||
| -rw-r--r-- | include/external/marsio.h | 4 | ||||
| -rw-r--r-- | include/internal/vdev_define.h | 7 | ||||
| -rw-r--r-- | infra/include/vnode.h | 16 | ||||
| -rw-r--r-- | infra/src/vnode_common.c | 8 | ||||
| -rw-r--r-- | infra/src/vnode_mirror.c | 39 | ||||
| -rw-r--r-- | service/src/monit.c | 7 | ||||
| -rw-r--r-- | service/src/vdata.c | 6 | ||||
| -rw-r--r-- | service/src/vdev.c | 2 |
13 files changed, 161 insertions, 15 deletions
diff --git a/app/include/mrapp.h b/app/include/mrapp.h index cc0ee7b..afe019e 100644 --- a/app/include/mrapp.h +++ b/app/include/mrapp.h @@ -24,8 +24,17 @@ struct mrapp_stat uint64_t packet_recv_drop; uint64_t packet_send_drop; + + uint64_t poll_wake_up_by_ev; + uint64_t poll_wake_up_timeout; }; +struct mr_vdev_counters +{ + unsigned int rx_burst_call_counters; + unsigned int rx_burst_zero_counters; +} __rte_cache_aligned; + /* 用户设备描述符 */ struct mr_vdev { @@ -40,6 +49,8 @@ struct mr_vdev int rx_notify_fds[MR_SID_MAX]; struct rte_epoll_event rx_notify_epoll_events[MR_SID_MAX]; unsigned int nr_rx_notify_fds; + + struct mr_vdev_counters counters[MR_SID_MAX]; }; struct mr_thread_info @@ -133,7 +144,12 @@ struct mr_instance /* rx epoll fd */ int rx_notify_epfd[MR_SID_MAX]; - unsigned int stat_notify_wake_up[MR_SID_MAX]; + unsigned int stat_notify_wake_up_from_ev[MR_SID_MAX]; + unsigned int stat_notify_wake_up_from_timeout[MR_SID_MAX]; + unsigned int busy_loop_counter[MR_SID_MAX]; + + /* threshold */ + unsigned int rx_notify_defer_threshold; /* mempool cache map */ struct mp_cache_map * mp_cache_map[MR_MEMPOOL_COUNT_MAX]; diff --git a/app/src/marsio.c b/app/src/marsio.c index 0ceb4b6..ad22ee7 100644 --- a/app/src/marsio.c +++ b/app/src/marsio.c @@ -972,6 +972,7 @@ int marsio_init(struct mr_instance * instance, const char * appsym) /* 初始化EAL环境 */ mrapp_eal_init(instance); mrapp_rx_notify_epfd_init(instance); + instance->rx_notify_defer_threshold = 128; /* 注册处理应用注册结果的回调函数 */ ctrlmsg_msg_reciver_register(instance->ctrlmsg_handler, CTRLMSG_TOPIC_APP_REGISTER, CTRL_MSG_TYPE_RESPONSE, diff --git a/app/src/monit.c b/app/src/monit.c index 98c8add..ff474fd 100644 --- a/app/src/monit.c +++ b/app/src/monit.c @@ -218,6 +218,8 @@ static cJSON * monit_app_stat(struct mr_instance * instance) _trans_app_stat(packet_send_drop); _trans_app_stat(mbuf_alloc_count); _trans_app_stat(mbuf_free_count); + _trans_app_stat(poll_wake_up_by_ev); + _trans_app_stat(poll_wake_up_timeout); /* 在用Buffer数量统计 */ uint64_t __trans_mbuf_in_use_count[MR_SID_MAX]; @@ -243,6 +245,8 @@ do { \ _json_generate(mbuf_alloc_count); _json_generate(mbuf_free_count); _json_generate(mbuf_in_use_count); + _json_generate(poll_wake_up_by_ev); + _json_generate(poll_wake_up_timeout); #undef _trans_app_stat #undef _trans_app_stat_array diff --git a/app/src/rawio.c b/app/src/rawio.c index c8615d2..97398f0 100644 --- a/app/src/rawio.c +++ b/app/src/rawio.c @@ -5,6 +5,7 @@ #include <mrapp.h> #include <protect.h> #include <rte_epoll.h> +#include <sys/epoll.h> #include <rte_ip.h> #include <rte_malloc.h> #include <rte_mbuf.h> @@ -55,10 +56,13 @@ int marsio_recv_burst(struct mr_vdev * vdev, queue_id_t qid, marsio_buff_t * mbu struct vdev_instance * vdi = vdev->vdi; struct rte_mbuf ** __mbufs = (struct rte_mbuf **)mbufs; + struct mr_vdev_counters * counters = &vdev->counters[qid]; int ret = vnode_mirror_dequeue_burst(vdi->vnode_rx_cons, qid, __mbufs, nr_mbufs); if (unlikely(ret == 0)) + { goto out; + } PROTECT_rte_mbuf_unpoison_bulk(__mbufs, ret); @@ -80,7 +84,17 @@ int marsio_recv_burst(struct mr_vdev * vdev, queue_id_t qid, marsio_buff_t * mbu } PROTECT_rte_mbuf_poison_bulk(__mbufs, ret); + out: + if (ret == 0) + { + counters->rx_burst_zero_counters++; + } + else + { + counters->rx_burst_call_counters++; + } + return ret; } @@ -183,17 +197,47 @@ void marsio_send_burst_flush(struct mr_sendpath * sendpath, queue_id_t sid) vnode_mirror_flush(sendpath->target_vdi->vnode_ftx_prod, sid); } +int marsio_poll_register_eventfd(struct mr_instance * instance, int eventfd, unsigned int tid) +{ + struct rte_epoll_event * epoll_event = rte_zmalloc(NULL, sizeof(struct rte_epoll_event), 0); + epoll_event->epdata.event = EPOLLIN; + epoll_event->epdata.data = NULL; + return rte_epoll_ctl(instance->rx_notify_epfd[tid], EPOLL_CTL_ADD, eventfd, epoll_event); +} + int marsio_poll_wait(struct mr_instance * instance, struct mr_vdev * vdevs[], unsigned int nr_vdevs, unsigned int tid, int timeout) { struct rte_epoll_event epoll_events[16] = {}; eventfd_t _not_use_eventfd_value; - /* set the notify status to sleep */ + /* flush the send buffer */ + for(unsigned int i = 0; i < nr_vdevs; i++) + { + vnode_mirror_flush(vdevs[i]->vdi->vnode_tx_prod, tid); + vnode_mirror_flush(vdevs[i]->vdi->vnode_ftx_prod, tid); + } + + for(unsigned int i =0; i < nr_vdevs; i++) + { + struct mr_vdev_counters * vdev_counter = &vdevs[i]->counters[tid]; + if (vdev_counter->rx_burst_zero_counters < instance->rx_notify_defer_threshold) + { + return 0; + } + } + + /* clear the vdev counters, set the notify status to sleep */ for (unsigned int i = 0; i < nr_vdevs; i++) { + struct mr_vdev_counters * vdev_counter = &vdevs[i]->counters[tid]; + vdev_counter->rx_burst_zero_counters = 0; + vdev_counter->rx_burst_call_counters = 0; + struct vnode_cons_notify * cons_notify = vdevs[i]->vdi->vnode_rx_cons_notify; - cons_notify[tid].notify_status = 1; + + assert(cons_notify[tid].cons_running_status == CONS_STATUS_RUNNING); + cons_notify[tid].cons_running_status = CONS_STATUS_WAITING; } /* wait for the event until timeout */ @@ -212,12 +256,23 @@ int marsio_poll_wait(struct mr_instance * instance, struct mr_vdev * vdevs[], un eventfd_read(epoll_event->fd, &_not_use_eventfd_value); } + if (n > 0) + { + instance->stat[tid].poll_wake_up_by_ev++; + } + else + { + instance->stat[tid].poll_wake_up_timeout++; + } + + /* go back from wait */ for (unsigned int i = 0; i < nr_vdevs; i++) { struct vnode_cons_notify * cons_notify = vdevs[i]->vdi->vnode_rx_cons_notify; - cons_notify[tid].notify_status = 0; + + assert(cons_notify[tid].cons_running_status == CONS_STATUS_WAITING); + cons_notify[tid].cons_running_status = CONS_STATUS_RUNNING; } - instance->stat_notify_wake_up[tid]++; return 0; }
\ No newline at end of file diff --git a/app/src/version.map b/app/src/version.map index 28116a2..ac83a57 100644 --- a/app/src/version.map +++ b/app/src/version.map @@ -82,6 +82,7 @@ global: marsio_buff_prepend_sid_list; marsio_buff_get_current_sid; marsio_poll_wait; + marsio_poll_register_eventfd; local: *; }; diff --git a/include/external/marsio.h b/include/external/marsio.h index 791f73b..f3314c5 100644 --- a/include/external/marsio.h +++ b/include/external/marsio.h @@ -110,7 +110,7 @@ enum mr_buff_metadata_type /* dir, internal->external or external->internal */ MR_BUFF_DIR = 4, /* payload offset */ - MR_BUFF_PAYLOAD_OFFSET = 5 + MR_BUFF_PAYLOAD_OFFSET = 5, }; #ifdef __cplusplus @@ -152,6 +152,8 @@ int marsio_option_set(struct mr_instance * instance, marsio_opt_type_t opt_type, int marsio_init(struct mr_instance * instance, const char * appsym); int marsio_thread_init(struct mr_instance * instance); int marsio_destory(struct mr_instance * instance); + +int marsio_poll_register_eventfd(struct mr_instance * instance, int eventfd, unsigned int tid); int marsio_poll_wait(struct mr_instance * instance, struct mr_vdev * vdevs[], unsigned int nr_vdevs, unsigned int tid, int timeout); struct mr_vdev * marsio_open_device(struct mr_instance * instance, const char * devsym, unsigned int nr_rxstream, diff --git a/include/internal/vdev_define.h b/include/internal/vdev_define.h index 54ad015..b12417f 100644 --- a/include/internal/vdev_define.h +++ b/include/internal/vdev_define.h @@ -51,6 +51,13 @@ struct vdev_stat_info uint64_t ltx_deliver[MR_SID_MAX]; uint64_t ltx_missed[MR_SID_MAX]; uint64_t ltx_total_len[MR_SID_MAX]; + +#if 0 + /* NOTIFY */ + uint64_t notify_state_waiting[MR_SID_MAX]; + uint64_t notify_state_running[MR_SID_MAX]; + uint64_t notify_state_ready[MR_SID_MAX]; +#endif }; /* 虚拟设备信息 */ diff --git a/infra/include/vnode.h b/infra/include/vnode.h index 4ecdd81..96ba4ac 100644 --- a/infra/include/vnode.h +++ b/infra/include/vnode.h @@ -91,13 +91,25 @@ struct vnode_prod_stat volatile uint64_t cloned_fail; #endif + /* notify */ + rte_atomic64_t notify_state_waiting; + rte_atomic64_t notify_state_running; + rte_atomic64_t notify_state_ready; + } __rte_cache_aligned; +enum cons_running_status +{ + CONS_STATUS_RUNNING, + CONS_STATUS_WAITING, + CONS_STATUS_READY, +}; + struct vnode_cons_notify { int enable; - int notify_eventfd; - volatile int notify_status; + int cons_notify_eventfd; + volatile int cons_running_status; }; #if VNODE_STAT_BY_ATOMIC diff --git a/infra/src/vnode_common.c b/infra/src/vnode_common.c index 54daf0b..5ad2504 100644 --- a/infra/src/vnode_common.c +++ b/infra/src/vnode_common.c @@ -323,10 +323,10 @@ struct vnode_cons * __vnode_common_create_cons(struct vnode * vnode, const char if(vnode->notify_cons_when_rx) { cons_notify_ctx->enable = 1; - cons_notify_ctx->notify_status = 0; - cons_notify_ctx->notify_eventfd = eventfd(1, EFD_CLOEXEC | EFD_NONBLOCK); + cons_notify_ctx->cons_running_status = CONS_STATUS_RUNNING; + cons_notify_ctx->cons_notify_eventfd = eventfd(1, EFD_CLOEXEC | EFD_NONBLOCK); - if (unlikely(cons_notify_ctx->notify_eventfd < 0)) + if (unlikely(cons_notify_ctx->cons_notify_eventfd < 0)) { MR_ERROR("failed at create eventfd for vnode consumer %s: %s", symbol, strerror(errno)); goto err; @@ -492,7 +492,7 @@ int __vnode_common_delete_cons(struct vnode_cons * cons) { if (cons->notify[qid].enable) { - close(cons->notify[qid].notify_eventfd); + close(cons->notify[qid].cons_notify_eventfd); } } diff --git a/infra/src/vnode_mirror.c b/infra/src/vnode_mirror.c index 6979db6..fa972c2 100644 --- a/infra/src/vnode_mirror.c +++ b/infra/src/vnode_mirror.c @@ -53,9 +53,44 @@ static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons out: cons_notify_ctx = &cons->notify[consq]; - if (cons_notify_ctx->enable && cons_notify_ctx->notify_status == 1) + if(cons_notify_ctx->enable) { - eventfd_write(cons_notify_ctx->notify_eventfd, 1); +#if 0 + if (cons_notify_ctx->cons_running_status == CONS_STATUS_WAITING) + { + VNODE_STAT_UPDATE(prod, prodq, notify_state_waiting, 1); + } + else if (cons_notify_ctx->cons_running_status == CONS_STATUS_READY) + { + VNODE_STAT_UPDATE(prod, prodq, notify_state_ready, 1); + } + else if (cons_notify_ctx->cons_running_status == CONS_STATUS_RUNNING) + { + VNODE_STAT_UPDATE(prod, prodq, notify_state_running, 1); + } + else + { + assert(0); + } +#endif + + /* wakeup the cons when it is waiting */ + if (cons_notify_ctx->cons_running_status == CONS_STATUS_WAITING) + { + const static uint64_t _v = 1; + ssize_t size = write(cons_notify_ctx->cons_notify_eventfd, &_v, sizeof(_v)); + +#if 0 + if (size < 0) + { + MR_ERROR("Failed to send interrupt: %s", strerror(errno)); + } + else + { + cons_notify_ctx->cons_running_status = CONS_STATUS_READY; + } +#endif + } } // 更新生产者统计计数 diff --git a/service/src/monit.c b/service/src/monit.c index f0d9961..314c575 100644 --- a/service/src/monit.c +++ b/service/src/monit.c @@ -80,6 +80,13 @@ static cJSON * __create_vdev_stats(struct vdev * vdev, unsigned int nr_serv_thre __JOIN_VDEV_VALUE_STATS_ITEM(ltx_missed); __JOIN_VDEV_VALUE_STATS_ITEM(ltx_total_len); + /* notify events */ +#if 0 + __JOIN_VDEV_VALUE_STATS_ITEM(notify_state_waiting); + __JOIN_VDEV_VALUE_STATS_ITEM(notify_state_running); + __JOIN_VDEV_VALUE_STATS_ITEM(notify_state_ready); +#endif + #define __JOIN_VDEV_SPEED_STATS_ITEM(item) \ do \ { \ diff --git a/service/src/vdata.c b/service/src/vdata.c index b6441c5..558008a 100644 --- a/service/src/vdata.c +++ b/service/src/vdata.c @@ -115,6 +115,12 @@ static int vdev_data_stats_get(struct _vdev * _vdev, struct vdev_stat_info * sta stat_info->rx_deliver[i] = VNODE_STAT_READ(&st_prod_rx[i].deliver); stat_info->rx_missed[i] = VNODE_STAT_READ(&st_prod_rx[i].missed); stat_info->rx_total_len[i] = VNODE_STAT_READ(&st_prod_rx[i].on_line); + +#if 0 + stat_info->notify_state_running[i] = rte_atomic64_read(&st_prod_rx[i].notify_state_running); + stat_info->notify_state_ready[i] = rte_atomic64_read(&st_prod_rx[i].notify_state_ready); + stat_info->notify_state_waiting[i] = rte_atomic64_read(&st_prod_rx[i].notify_state_waiting); +#endif } for (int i = 0; i < _vdev->nr_txstream; i++) diff --git a/service/src/vdev.c b/service/src/vdev.c index 32c8f53..cbc4819 100644 --- a/service/src/vdev.c +++ b/service/src/vdev.c @@ -142,7 +142,7 @@ static int vdev_instance_create_handler(const struct rte_mp_msg * msg, const voi struct vnode_cons_notify * notify_ctx = vnode_mirror_notify_ctx_cons(vdi->vnode_rx_cons); for (unsigned int i = 0; i < msg_req->nr_rxstream; i++) { - msg_rep->fds[i] = notify_ctx[i].notify_eventfd; + msg_rep->fds[i] = notify_ctx[i].cons_notify_eventfd; } msg_rep->num_fds = msg_req->nr_rxstream; |
