summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--app/include/mrapp.h18
-rw-r--r--app/src/marsio.c1
-rw-r--r--app/src/monit.c4
-rw-r--r--app/src/rawio.c63
-rw-r--r--app/src/version.map1
-rw-r--r--include/external/marsio.h4
-rw-r--r--include/internal/vdev_define.h7
-rw-r--r--infra/include/vnode.h16
-rw-r--r--infra/src/vnode_common.c8
-rw-r--r--infra/src/vnode_mirror.c39
-rw-r--r--service/src/monit.c7
-rw-r--r--service/src/vdata.c6
-rw-r--r--service/src/vdev.c2
13 files changed, 161 insertions, 15 deletions
diff --git a/app/include/mrapp.h b/app/include/mrapp.h
index cc0ee7b..afe019e 100644
--- a/app/include/mrapp.h
+++ b/app/include/mrapp.h
@@ -24,8 +24,17 @@ struct mrapp_stat
uint64_t packet_recv_drop;
uint64_t packet_send_drop;
+
+ uint64_t poll_wake_up_by_ev;
+ uint64_t poll_wake_up_timeout;
};
+struct mr_vdev_counters
+{
+ unsigned int rx_burst_call_counters;
+ unsigned int rx_burst_zero_counters;
+} __rte_cache_aligned;
+
/* 用户设备描述符 */
struct mr_vdev
{
@@ -40,6 +49,8 @@ struct mr_vdev
int rx_notify_fds[MR_SID_MAX];
struct rte_epoll_event rx_notify_epoll_events[MR_SID_MAX];
unsigned int nr_rx_notify_fds;
+
+ struct mr_vdev_counters counters[MR_SID_MAX];
};
struct mr_thread_info
@@ -133,7 +144,12 @@ struct mr_instance
/* rx epoll fd */
int rx_notify_epfd[MR_SID_MAX];
- unsigned int stat_notify_wake_up[MR_SID_MAX];
+ unsigned int stat_notify_wake_up_from_ev[MR_SID_MAX];
+ unsigned int stat_notify_wake_up_from_timeout[MR_SID_MAX];
+ unsigned int busy_loop_counter[MR_SID_MAX];
+
+ /* threshold */
+ unsigned int rx_notify_defer_threshold;
/* mempool cache map */
struct mp_cache_map * mp_cache_map[MR_MEMPOOL_COUNT_MAX];
diff --git a/app/src/marsio.c b/app/src/marsio.c
index 0ceb4b6..ad22ee7 100644
--- a/app/src/marsio.c
+++ b/app/src/marsio.c
@@ -972,6 +972,7 @@ int marsio_init(struct mr_instance * instance, const char * appsym)
/* 初始化EAL环境 */
mrapp_eal_init(instance);
mrapp_rx_notify_epfd_init(instance);
+ instance->rx_notify_defer_threshold = 128;
/* 注册处理应用注册结果的回调函数 */
ctrlmsg_msg_reciver_register(instance->ctrlmsg_handler, CTRLMSG_TOPIC_APP_REGISTER, CTRL_MSG_TYPE_RESPONSE,
diff --git a/app/src/monit.c b/app/src/monit.c
index 98c8add..ff474fd 100644
--- a/app/src/monit.c
+++ b/app/src/monit.c
@@ -218,6 +218,8 @@ static cJSON * monit_app_stat(struct mr_instance * instance)
_trans_app_stat(packet_send_drop);
_trans_app_stat(mbuf_alloc_count);
_trans_app_stat(mbuf_free_count);
+ _trans_app_stat(poll_wake_up_by_ev);
+ _trans_app_stat(poll_wake_up_timeout);
/* 在用Buffer数量统计 */
uint64_t __trans_mbuf_in_use_count[MR_SID_MAX];
@@ -243,6 +245,8 @@ do { \
_json_generate(mbuf_alloc_count);
_json_generate(mbuf_free_count);
_json_generate(mbuf_in_use_count);
+ _json_generate(poll_wake_up_by_ev);
+ _json_generate(poll_wake_up_timeout);
#undef _trans_app_stat
#undef _trans_app_stat_array
diff --git a/app/src/rawio.c b/app/src/rawio.c
index c8615d2..97398f0 100644
--- a/app/src/rawio.c
+++ b/app/src/rawio.c
@@ -5,6 +5,7 @@
#include <mrapp.h>
#include <protect.h>
#include <rte_epoll.h>
+#include <sys/epoll.h>
#include <rte_ip.h>
#include <rte_malloc.h>
#include <rte_mbuf.h>
@@ -55,10 +56,13 @@ int marsio_recv_burst(struct mr_vdev * vdev, queue_id_t qid, marsio_buff_t * mbu
struct vdev_instance * vdi = vdev->vdi;
struct rte_mbuf ** __mbufs = (struct rte_mbuf **)mbufs;
+ struct mr_vdev_counters * counters = &vdev->counters[qid];
int ret = vnode_mirror_dequeue_burst(vdi->vnode_rx_cons, qid, __mbufs, nr_mbufs);
if (unlikely(ret == 0))
+ {
goto out;
+ }
PROTECT_rte_mbuf_unpoison_bulk(__mbufs, ret);
@@ -80,7 +84,17 @@ int marsio_recv_burst(struct mr_vdev * vdev, queue_id_t qid, marsio_buff_t * mbu
}
PROTECT_rte_mbuf_poison_bulk(__mbufs, ret);
+
out:
+ if (ret == 0)
+ {
+ counters->rx_burst_zero_counters++;
+ }
+ else
+ {
+ counters->rx_burst_call_counters++;
+ }
+
return ret;
}
@@ -183,17 +197,47 @@ void marsio_send_burst_flush(struct mr_sendpath * sendpath, queue_id_t sid)
vnode_mirror_flush(sendpath->target_vdi->vnode_ftx_prod, sid);
}
+int marsio_poll_register_eventfd(struct mr_instance * instance, int eventfd, unsigned int tid)
+{
+ struct rte_epoll_event * epoll_event = rte_zmalloc(NULL, sizeof(struct rte_epoll_event), 0);
+ epoll_event->epdata.event = EPOLLIN;
+ epoll_event->epdata.data = NULL;
+ return rte_epoll_ctl(instance->rx_notify_epfd[tid], EPOLL_CTL_ADD, eventfd, epoll_event);
+}
+
int marsio_poll_wait(struct mr_instance * instance, struct mr_vdev * vdevs[], unsigned int nr_vdevs, unsigned int tid,
int timeout)
{
struct rte_epoll_event epoll_events[16] = {};
eventfd_t _not_use_eventfd_value;
- /* set the notify status to sleep */
+ /* flush the send buffer */
+ for(unsigned int i = 0; i < nr_vdevs; i++)
+ {
+ vnode_mirror_flush(vdevs[i]->vdi->vnode_tx_prod, tid);
+ vnode_mirror_flush(vdevs[i]->vdi->vnode_ftx_prod, tid);
+ }
+
+ for(unsigned int i =0; i < nr_vdevs; i++)
+ {
+ struct mr_vdev_counters * vdev_counter = &vdevs[i]->counters[tid];
+ if (vdev_counter->rx_burst_zero_counters < instance->rx_notify_defer_threshold)
+ {
+ return 0;
+ }
+ }
+
+ /* clear the vdev counters, set the notify status to sleep */
for (unsigned int i = 0; i < nr_vdevs; i++)
{
+ struct mr_vdev_counters * vdev_counter = &vdevs[i]->counters[tid];
+ vdev_counter->rx_burst_zero_counters = 0;
+ vdev_counter->rx_burst_call_counters = 0;
+
struct vnode_cons_notify * cons_notify = vdevs[i]->vdi->vnode_rx_cons_notify;
- cons_notify[tid].notify_status = 1;
+
+ assert(cons_notify[tid].cons_running_status == CONS_STATUS_RUNNING);
+ cons_notify[tid].cons_running_status = CONS_STATUS_WAITING;
}
/* wait for the event until timeout */
@@ -212,12 +256,23 @@ int marsio_poll_wait(struct mr_instance * instance, struct mr_vdev * vdevs[], un
eventfd_read(epoll_event->fd, &_not_use_eventfd_value);
}
+ if (n > 0)
+ {
+ instance->stat[tid].poll_wake_up_by_ev++;
+ }
+ else
+ {
+ instance->stat[tid].poll_wake_up_timeout++;
+ }
+
+ /* go back from wait */
for (unsigned int i = 0; i < nr_vdevs; i++)
{
struct vnode_cons_notify * cons_notify = vdevs[i]->vdi->vnode_rx_cons_notify;
- cons_notify[tid].notify_status = 0;
+
+ assert(cons_notify[tid].cons_running_status == CONS_STATUS_WAITING);
+ cons_notify[tid].cons_running_status = CONS_STATUS_RUNNING;
}
- instance->stat_notify_wake_up[tid]++;
return 0;
} \ No newline at end of file
diff --git a/app/src/version.map b/app/src/version.map
index 28116a2..ac83a57 100644
--- a/app/src/version.map
+++ b/app/src/version.map
@@ -82,6 +82,7 @@ global:
marsio_buff_prepend_sid_list;
marsio_buff_get_current_sid;
marsio_poll_wait;
+ marsio_poll_register_eventfd;
local: *;
};
diff --git a/include/external/marsio.h b/include/external/marsio.h
index 791f73b..f3314c5 100644
--- a/include/external/marsio.h
+++ b/include/external/marsio.h
@@ -110,7 +110,7 @@ enum mr_buff_metadata_type
/* dir, internal->external or external->internal */
MR_BUFF_DIR = 4,
/* payload offset */
- MR_BUFF_PAYLOAD_OFFSET = 5
+ MR_BUFF_PAYLOAD_OFFSET = 5,
};
#ifdef __cplusplus
@@ -152,6 +152,8 @@ int marsio_option_set(struct mr_instance * instance, marsio_opt_type_t opt_type,
int marsio_init(struct mr_instance * instance, const char * appsym);
int marsio_thread_init(struct mr_instance * instance);
int marsio_destory(struct mr_instance * instance);
+
+int marsio_poll_register_eventfd(struct mr_instance * instance, int eventfd, unsigned int tid);
int marsio_poll_wait(struct mr_instance * instance, struct mr_vdev * vdevs[], unsigned int nr_vdevs, unsigned int tid, int timeout);
struct mr_vdev * marsio_open_device(struct mr_instance * instance, const char * devsym, unsigned int nr_rxstream,
diff --git a/include/internal/vdev_define.h b/include/internal/vdev_define.h
index 54ad015..b12417f 100644
--- a/include/internal/vdev_define.h
+++ b/include/internal/vdev_define.h
@@ -51,6 +51,13 @@ struct vdev_stat_info
uint64_t ltx_deliver[MR_SID_MAX];
uint64_t ltx_missed[MR_SID_MAX];
uint64_t ltx_total_len[MR_SID_MAX];
+
+#if 0
+ /* NOTIFY */
+ uint64_t notify_state_waiting[MR_SID_MAX];
+ uint64_t notify_state_running[MR_SID_MAX];
+ uint64_t notify_state_ready[MR_SID_MAX];
+#endif
};
/* 虚拟设备信息 */
diff --git a/infra/include/vnode.h b/infra/include/vnode.h
index 4ecdd81..96ba4ac 100644
--- a/infra/include/vnode.h
+++ b/infra/include/vnode.h
@@ -91,13 +91,25 @@ struct vnode_prod_stat
volatile uint64_t cloned_fail;
#endif
+ /* notify */
+ rte_atomic64_t notify_state_waiting;
+ rte_atomic64_t notify_state_running;
+ rte_atomic64_t notify_state_ready;
+
} __rte_cache_aligned;
+enum cons_running_status
+{
+ CONS_STATUS_RUNNING,
+ CONS_STATUS_WAITING,
+ CONS_STATUS_READY,
+};
+
struct vnode_cons_notify
{
int enable;
- int notify_eventfd;
- volatile int notify_status;
+ int cons_notify_eventfd;
+ volatile int cons_running_status;
};
#if VNODE_STAT_BY_ATOMIC
diff --git a/infra/src/vnode_common.c b/infra/src/vnode_common.c
index 54daf0b..5ad2504 100644
--- a/infra/src/vnode_common.c
+++ b/infra/src/vnode_common.c
@@ -323,10 +323,10 @@ struct vnode_cons * __vnode_common_create_cons(struct vnode * vnode, const char
if(vnode->notify_cons_when_rx)
{
cons_notify_ctx->enable = 1;
- cons_notify_ctx->notify_status = 0;
- cons_notify_ctx->notify_eventfd = eventfd(1, EFD_CLOEXEC | EFD_NONBLOCK);
+ cons_notify_ctx->cons_running_status = CONS_STATUS_RUNNING;
+ cons_notify_ctx->cons_notify_eventfd = eventfd(1, EFD_CLOEXEC | EFD_NONBLOCK);
- if (unlikely(cons_notify_ctx->notify_eventfd < 0))
+ if (unlikely(cons_notify_ctx->cons_notify_eventfd < 0))
{
MR_ERROR("failed at create eventfd for vnode consumer %s: %s", symbol, strerror(errno));
goto err;
@@ -492,7 +492,7 @@ int __vnode_common_delete_cons(struct vnode_cons * cons)
{
if (cons->notify[qid].enable)
{
- close(cons->notify[qid].notify_eventfd);
+ close(cons->notify[qid].cons_notify_eventfd);
}
}
diff --git a/infra/src/vnode_mirror.c b/infra/src/vnode_mirror.c
index 6979db6..fa972c2 100644
--- a/infra/src/vnode_mirror.c
+++ b/infra/src/vnode_mirror.c
@@ -53,9 +53,44 @@ static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons
out:
cons_notify_ctx = &cons->notify[consq];
- if (cons_notify_ctx->enable && cons_notify_ctx->notify_status == 1)
+ if(cons_notify_ctx->enable)
{
- eventfd_write(cons_notify_ctx->notify_eventfd, 1);
+#if 0
+ if (cons_notify_ctx->cons_running_status == CONS_STATUS_WAITING)
+ {
+ VNODE_STAT_UPDATE(prod, prodq, notify_state_waiting, 1);
+ }
+ else if (cons_notify_ctx->cons_running_status == CONS_STATUS_READY)
+ {
+ VNODE_STAT_UPDATE(prod, prodq, notify_state_ready, 1);
+ }
+ else if (cons_notify_ctx->cons_running_status == CONS_STATUS_RUNNING)
+ {
+ VNODE_STAT_UPDATE(prod, prodq, notify_state_running, 1);
+ }
+ else
+ {
+ assert(0);
+ }
+#endif
+
+ /* wakeup the cons when it is waiting */
+ if (cons_notify_ctx->cons_running_status == CONS_STATUS_WAITING)
+ {
+ const static uint64_t _v = 1;
+ ssize_t size = write(cons_notify_ctx->cons_notify_eventfd, &_v, sizeof(_v));
+
+#if 0
+ if (size < 0)
+ {
+ MR_ERROR("Failed to send interrupt: %s", strerror(errno));
+ }
+ else
+ {
+ cons_notify_ctx->cons_running_status = CONS_STATUS_READY;
+ }
+#endif
+ }
}
// 更新生产者统计计数
diff --git a/service/src/monit.c b/service/src/monit.c
index f0d9961..314c575 100644
--- a/service/src/monit.c
+++ b/service/src/monit.c
@@ -80,6 +80,13 @@ static cJSON * __create_vdev_stats(struct vdev * vdev, unsigned int nr_serv_thre
__JOIN_VDEV_VALUE_STATS_ITEM(ltx_missed);
__JOIN_VDEV_VALUE_STATS_ITEM(ltx_total_len);
+ /* notify events */
+#if 0
+ __JOIN_VDEV_VALUE_STATS_ITEM(notify_state_waiting);
+ __JOIN_VDEV_VALUE_STATS_ITEM(notify_state_running);
+ __JOIN_VDEV_VALUE_STATS_ITEM(notify_state_ready);
+#endif
+
#define __JOIN_VDEV_SPEED_STATS_ITEM(item) \
do \
{ \
diff --git a/service/src/vdata.c b/service/src/vdata.c
index b6441c5..558008a 100644
--- a/service/src/vdata.c
+++ b/service/src/vdata.c
@@ -115,6 +115,12 @@ static int vdev_data_stats_get(struct _vdev * _vdev, struct vdev_stat_info * sta
stat_info->rx_deliver[i] = VNODE_STAT_READ(&st_prod_rx[i].deliver);
stat_info->rx_missed[i] = VNODE_STAT_READ(&st_prod_rx[i].missed);
stat_info->rx_total_len[i] = VNODE_STAT_READ(&st_prod_rx[i].on_line);
+
+#if 0
+ stat_info->notify_state_running[i] = rte_atomic64_read(&st_prod_rx[i].notify_state_running);
+ stat_info->notify_state_ready[i] = rte_atomic64_read(&st_prod_rx[i].notify_state_ready);
+ stat_info->notify_state_waiting[i] = rte_atomic64_read(&st_prod_rx[i].notify_state_waiting);
+#endif
}
for (int i = 0; i < _vdev->nr_txstream; i++)
diff --git a/service/src/vdev.c b/service/src/vdev.c
index 32c8f53..cbc4819 100644
--- a/service/src/vdev.c
+++ b/service/src/vdev.c
@@ -142,7 +142,7 @@ static int vdev_instance_create_handler(const struct rte_mp_msg * msg, const voi
struct vnode_cons_notify * notify_ctx = vnode_mirror_notify_ctx_cons(vdi->vnode_rx_cons);
for (unsigned int i = 0; i < msg_req->nr_rxstream; i++)
{
- msg_rep->fds[i] = notify_ctx[i].notify_eventfd;
+ msg_rep->fds[i] = notify_ctx[i].cons_notify_eventfd;
}
msg_rep->num_fds = msg_req->nr_rxstream;