summaryrefslogtreecommitdiff
path: root/app/src
diff options
context:
space:
mode:
authorLu Qiuwen <[email protected]>2023-08-02 13:57:10 +0800
committerLu Qiuwen <[email protected]>2023-08-02 15:00:26 +0800
commitfcf0ba2b8862814acc8552fc6b019be5674d33a6 (patch)
treeddefeb386db54e88eee67ab8aad066317573263d /app/src
parent2d785acf08d8c27225fd7034c9618dfb74e25092 (diff)
改进poll_wait机制,避免service频繁通知应用的数据面线程。
Diffstat (limited to 'app/src')
-rw-r--r--app/src/marsio.c10
-rw-r--r--app/src/mrb.c5
-rw-r--r--app/src/rawio.c56
3 files changed, 52 insertions, 19 deletions
diff --git a/app/src/marsio.c b/app/src/marsio.c
index bc08bd3..0ea11fe 100644
--- a/app/src/marsio.c
+++ b/app/src/marsio.c
@@ -135,8 +135,14 @@ static void mrapp_rx_notify_init(struct mr_instance * instance)
instance->rx_notify_epfd[i] = epfd;
}
- MESA_load_profile_uint_def(instance->g_cfgfile_path, "service", "poll_wait_throttle",
- &instance->notify_throttle_threshold, 128);
+ MESA_load_profile_uint_def(instance->g_cfgfile_path, "service", "poll_wait_throttle_usleep_threshold",
+ &instance->zero_recv_usleep_threshold, 32);
+
+ MESA_load_profile_uint_def(instance->g_cfgfile_path, "service", "poll_wait_throttle_usleep_period",
+ &instance->zero_recv_usleep_period, 5);
+
+ MESA_load_profile_uint_def(instance->g_cfgfile_path, "service", "poll_wait_throttle_notify_threshold",
+ &instance->zero_recv_notify_threshold, 256);
MESA_load_profile_uint_def(instance->g_cfgfile_path, "service", "poll_wait_enable", &instance->en_notify, 1);
MESA_load_profile_uint_def(instance->g_cfgfile_path, "service", "vdev_buffer_size", &instance->sz_vdev_buffer,
diff --git a/app/src/mrb.c b/app/src/mrb.c
index e03bc7f..bac8da4 100644
--- a/app/src/mrb.c
+++ b/app/src/mrb.c
@@ -211,10 +211,13 @@ void marsio_buff_do_rehash(struct mr_instance * mr_instance, marsio_buff_t * m)
pkt_parser_init(&pkt_parser, &mrb_metadata->pkt_parser_result, LAYER_TYPE_ALL, MR_PKT_PARSER_LAYERS_MAX);
complex_parser_ether(&pkt_parser, rte_pktmbuf_mtod(mbuf, const char *));
- /* do the rehash */
struct rte_mbuf * mbufs[1] = {mbuf};
struct pkt_parser_result * parser_results[1] = {&mrb_metadata->pkt_parser_result};
+ /* clear the txq hash */
+ mbuf->hash.txadapter.txq = 0;
+
+ /* do the rehash */
distributer_calculate_from_parser_results(mr_instance->dist_object, mbufs, parser_results, 1);
}
diff --git a/app/src/rawio.c b/app/src/rawio.c
index fd4db9d..b919278 100644
--- a/app/src/rawio.c
+++ b/app/src/rawio.c
@@ -79,7 +79,7 @@ int marsio_recv_burst(struct mr_vdev * vdev, queue_id_t qid, marsio_buff_t * mbu
}
else
{
- counters->rx_burst_call_counters++;
+ counters->rx_burst_zero_counters = 0;
}
rx_buffer->curser = 0;
@@ -127,7 +127,16 @@ int marsio_send_buffer_flush(struct mr_vdev * vdev, queue_id_t sid)
hash_t hash[MR_BURST_MAX];
for (int i = 0; i < tx_buffer->length; i++)
{
- hash[i] = tx_buffer->mbufs[i]->hash.usr;
+ uint16_t hash_txq = tx_buffer->mbufs[i]->hash.txadapter.txq;
+ uint32_t hash_usr = tx_buffer->mbufs[i]->hash.usr;
+
+ /* if the hash_qid's highest bit is set, use the txq */
+#if 1
+ hash[i] = hash_txq & 0x8000 ? hash_txq & 0x7FFF : hash_usr;
+#else
+ /* round-robin */
+ hash[i] = round_robin_counter++;
+#endif
}
int ret = vnode_mirror_enqueue_bulk(vdev->vdi->vnode_tx_prod, sid, tx_buffer->mbufs, hash, (int)tx_buffer->length);
@@ -220,41 +229,55 @@ int marsio_poll_register_eventfd(struct mr_instance * instance, int eventfd, uns
return rte_epoll_ctl(instance->rx_notify_epfd[tid], EPOLL_CTL_ADD, eventfd, epoll_event);
}
+static void clear_all_zero_recv_counters(struct mr_vdev * vdevs[], unsigned int nr_vdevs, unsigned int tid)
+{
+ for (unsigned int i = 0; i < nr_vdevs; i++)
+ {
+ struct mr_vdev_counters * vdev_counter = &vdevs[i]->counters[tid];
+ vdev_counter->rx_burst_zero_counters = 0;
+ vdev_counter->rx_burst_call_counters = 0;
+ }
+}
+
int marsio_poll_wait(struct mr_instance * instance, struct mr_vdev * vdevs[], unsigned int nr_vdevs, unsigned int tid,
int timeout)
{
eventfd_t _not_use_eventfd_value;
+ /* flush the vdev */
+ for (unsigned int i = 0; i < nr_vdevs; i++)
+ {
+ marsio_send_buffer_flush(vdevs[i], tid);
+ }
+
if (instance->en_notify == 0)
{
return 0;
}
+ unsigned int min_rx_burst_zero_counters = UINT32_MAX;
for (unsigned int i = 0; i < nr_vdevs; i++)
{
struct mr_vdev_counters * vdev_counter = &vdevs[i]->counters[tid];
- if (vdev_counter->rx_burst_zero_counters < instance->notify_throttle_threshold)
- {
- return 0;
- }
+ min_rx_burst_zero_counters = RTE_MIN(min_rx_burst_zero_counters, vdev_counter->rx_burst_zero_counters);
}
- /* flush the vdev */
- for (unsigned int i = 0; i < nr_vdevs; i++)
+ if (min_rx_burst_zero_counters < instance->zero_recv_usleep_threshold)
{
- marsio_send_buffer_flush(vdevs[i], tid);
+ return 0;
}
- /* clear the vdev counters, set the notify status to sleep */
- for (unsigned int i = 0; i < nr_vdevs; i++)
+ if (min_rx_burst_zero_counters < instance->zero_recv_notify_threshold)
{
- struct mr_vdev_counters * vdev_counter = &vdevs[i]->counters[tid];
- vdev_counter->rx_burst_zero_counters = 0;
- vdev_counter->rx_burst_call_counters = 0;
+ rte_delay_us_sleep(instance->zero_recv_usleep_period);
+ return 0;
+ }
+ /* set the notify status to sleep */
+ for (unsigned int i = 0; i < nr_vdevs; i++)
+ {
struct vnode_cons_notify * cons_notify = vdevs[i]->vdi->vnode_rx_cons_notify;
-
assert(cons_notify[tid].cons_running_status == CONS_STATUS_RUNNING);
cons_notify[tid].cons_running_status = CONS_STATUS_WAITING;
}
@@ -272,11 +295,12 @@ int marsio_poll_wait(struct mr_instance * instance, struct mr_vdev * vdevs[], un
for (unsigned int i = 0; i < nr_vdevs; i++)
{
struct vnode_cons_notify * cons_notify = vdevs[i]->vdi->vnode_rx_cons_notify;
-
assert(cons_notify[tid].cons_running_status == CONS_STATUS_WAITING);
cons_notify[tid].cons_running_status = CONS_STATUS_RUNNING;
}
+ clear_all_zero_recv_counters(vdevs, nr_vdevs, tid);
+
/* read the data in pipe, and drop it */
/* handle the read event, read the packet then redirect to shmdev queues */
for (int i = 0; i < n; i++)