#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include static inline unsigned int packet_total_len(struct rte_mbuf * mbufs[], unsigned int nr_mbufs) { unsigned int total_len = 0; for (int i = 0; i < nr_mbufs; i++) total_len += rte_pktmbuf_pkt_len(mbufs[i]); return total_len; } int mrapp_packet_fast_send_burst(struct mr_instance * instance, struct vdev_instance * vdi, queue_id_t qid, struct rte_mbuf * mbufs[], int nr_mbufs) { hash_t mbufs_hash[MR_BURST_MAX]; for (int i = 0; i < nr_mbufs; i++) { mbufs_hash[i] = qid; __rte_mbuf_sanity_check(mbufs[i], 1); } /* 快速发送锁。可以在任何上下文下发送快速报文 快速发送锁实现在应用,应用崩溃不会对主进程产生影响。 快速发送的报文应该数量很小,引入这个锁不会引起性能问题。 */ // TODO: 锁换一个位置 static rte_spinlock_t _f_fast_lock = {0}; PROTECT_rte_mbuf_poison_bulk(mbufs, nr_mbufs); rte_spinlock_lock(&_f_fast_lock); vnode_mirror_enqueue_bulk(vdi->vnode_ftx_prod, qid, mbufs, mbufs_hash, nr_mbufs); int ret = vnode_mirror_rt_object_retrieve(vdi->vnode_ftx_prod, qid, mbufs, nr_mbufs); if (ret > 0) { marsio_buff_free_v2(instance, (marsio_buff_t **)mbufs, ret); } rte_spinlock_unlock(&_f_fast_lock); return ret; } int marsio_recv_burst(struct mr_vdev * vdev, queue_id_t qid, marsio_buff_t * mbufs[], int nr_mbufs) { if (unlikely(qid >= vdev->nr_rxstream)) return -EINVAL; struct vdev_instance * vdi = vdev->vdi; struct mr_vdev_rx_buffer * rx_buffer = vdev->rx_buffer[qid]; /* rx_buffer is empty */ if (rx_buffer->curser == rx_buffer->length) { int nr_rx_mbufs = vnode_mirror_dequeue_burst(vdi->vnode_rx_cons, qid, rx_buffer->mbufs, (int)rx_buffer->size); if (vdev->tap_representor != NULL) { tap_representor_entry(vdev, qid, rx_buffer->mbufs, nr_rx_mbufs); } for (int i = 0; i < nr_rx_mbufs; i++) { __rte_mbuf_sanity_check(rx_buffer->mbufs[i], i); } struct mr_vdev_counters * counters = &vdev->counters[qid]; if (nr_rx_mbufs == 0) { counters->rx_burst_zero_counters++; } else { counters->rx_burst_zero_counters = 0; } rx_buffer->curser = 0; rx_buffer->length = nr_rx_mbufs; } /* dequeue from the rx buffer to the application */ unsigned int nr_mbufs_out = rx_buffer->length - rx_buffer->curser; if (nr_mbufs_out > nr_mbufs) { nr_mbufs_out = nr_mbufs; } for (int i = 0; i < nr_mbufs_out; i++) { mbufs[i] = rx_buffer->mbufs[rx_buffer->curser + i]; if (unlikely( marsio_dp_trace_measurements_can_emit(vdev->instance, mbufs[i], DP_TRACE_MEASUREMENT_TYPE_TELEMETRY))) { marsio_dp_trace_measurement_emit_fmt(vdev->instance, mbufs[i], DP_TRACE_MEASUREMENT_TYPE_TELEMETRY, "app_recv", "rx, dev=%s, qid=%u", vdev->devsym, qid); } if (unlikely(marsio_dp_trace_measurements_can_emit(vdev->instance, mbufs[i], DP_TRACE_MEASUREMENT_TYPE_TRACE))) { marsio_dp_trace_measurement_emit_fmt(vdev->instance, mbufs[i], DP_TRACE_MEASUREMENT_TYPE_TRACE, "app_recv", "rx, dev=%s, qid=%u", vdev->devsym, qid); } } rx_buffer->curser += nr_mbufs_out; return (int)nr_mbufs_out; } int marsio_recv_all_burst(struct mr_instance * instance, queue_id_t qid, marsio_buff_t * mbufs[], int nr_mbufs) { instance->recv_all_state[qid] = (instance->recv_all_state[qid] + 1) % instance->nr_vdevs; unsigned int state = instance->recv_all_state[qid]; struct mr_vdev * vdev = &instance->vdevs[state]; if (unlikely(vdev->nr_rxstream == 0)) { return 0; } return marsio_recv_burst(vdev, qid, mbufs, nr_mbufs); } int marsio_send_buffer_flush(struct mr_vdev * vdev, queue_id_t sid) { struct mr_vdev_tx_buffer * tx_buffer = vdev->tx_buffer[sid]; struct mr_instance * instance = vdev->instance; if (tx_buffer->length == 0) { return 0; } hash_t hash[MR_BURST_MAX]; for (int i = 0; i < tx_buffer->length; i++) { hash[i] = tx_buffer->mbufs[i]->hash.usr; if (marsio_dp_trace_measurements_can_emit(instance, tx_buffer->mbufs[i], DP_TRACE_MEASUREMENT_TYPE_TELEMETRY)) { marsio_dp_trace_measurement_emit_fmt(vdev->instance, tx_buffer->mbufs[i], DP_TRACE_MEASUREMENT_TYPE_TELEMETRY, "app_send", "tx, dev=%s, qid=%u", vdev->devsym, sid); } if (marsio_dp_trace_measurements_can_emit(instance, tx_buffer->mbufs[i], DP_TRACE_MEASUREMENT_TYPE_TRACE)) { marsio_dp_trace_measurement_emit_fmt(vdev->instance, tx_buffer->mbufs[i], DP_TRACE_MEASUREMENT_TYPE_TRACE, "app_send", "tx, dev=%s, qid=%u, hash=%u", vdev->devsym, sid, hash[i]); } } int ret = vnode_mirror_enqueue_bulk(vdev->vdi->vnode_tx_prod, sid, tx_buffer->mbufs, hash, (int)tx_buffer->length); if (ret < 0) { return ret; } /* free the mbufs backpressure from service */ ret = vnode_mirror_rt_object_retrieve(vdev->vdi->vnode_tx_prod, sid, tx_buffer->mbufs, (int)tx_buffer->length); if (ret > 0) { marsio_buff_free_v2(instance, (marsio_buff_t **)tx_buffer->mbufs, ret); } tx_buffer->length = 0; return ret; } int marsio_send_burst_with_buffer(struct mr_vdev * vdev, queue_id_t sid, marsio_buff_t * mbuf) { struct mr_vdev_tx_buffer * tx_buffer = vdev->tx_buffer[sid]; tx_buffer->mbufs[tx_buffer->length++] = mbuf; if (tx_buffer->length < tx_buffer->size) { return 0; } return marsio_send_buffer_flush(vdev, sid); } int marsio_send_burst(struct mr_sendpath * sendpath, queue_id_t sid, marsio_buff_t * mbufs[], int nr_mbufs) { for (int i = 0; i < nr_mbufs; i++) { __rte_mbuf_sanity_check(mbufs[i], i); } for (int i = 0; i < nr_mbufs; i++) { marsio_send_burst_with_buffer(sendpath->vdev, sid, mbufs[i]); } return RT_SUCCESS; } int marsio_send_burst_with_options(struct mr_sendpath * sendpath, queue_id_t sid, marsio_buff_t * mbufs[], int nr_mbufs, uint16_t options) { struct rte_mbuf ** _mbufs = (struct rte_mbuf **)mbufs; PROTECT_rte_mbuf_unpoison_bulk(_mbufs, nr_mbufs); for (int i = 0; i < nr_mbufs; i++) { __rte_mbuf_sanity_check(mbufs[i], i); } if (options & MARSIO_SEND_OPT_NO_FREE) { for (int i = 0; i < nr_mbufs; i++) rte_pktmbuf_refcnt_update(_mbufs[i], 1); } if (options & MARSIO_SEND_OPT_REHASH) { for (int i = 0; i < nr_mbufs; i++) marsio_buff_do_rehash(sendpath->instance, mbufs[i]); } hash_t hash[MR_BURST_MAX]; for (int i = 0; i < nr_mbufs; i++) { hash[i] = _mbufs[i]->hash.usr; } /* 线程运行情况统计 */ // thread_info.instance maybe null.Because the marsio_thread_init function may not have been executed yet if (thread_info.instance != NULL) { thread_id_t tid = thread_info.thread_id; thread_info.instance->stat[tid].packet_send_count += nr_mbufs; thread_info.instance->stat[tid].packet_send_length = packet_total_len(_mbufs, nr_mbufs); } for (int i = 0; i < nr_mbufs; i++) { if (marsio_dp_trace_measurements_can_emit(sendpath->instance, mbufs[i], DP_TRACE_MEASUREMENT_TYPE_TELEMETRY)) { marsio_dp_trace_measurement_emit_fmt(sendpath->instance, mbufs[i], DP_TRACE_MEASUREMENT_TYPE_TELEMETRY, "app_send", "tx, dev=%s, qid=%u", sendpath->vdev->devsym, sid); } if (marsio_dp_trace_measurements_can_emit(sendpath->instance, mbufs[i], DP_TRACE_MEASUREMENT_TYPE_TRACE)) { marsio_dp_trace_measurement_emit_fmt(sendpath->instance, mbufs[i], DP_TRACE_MEASUREMENT_TYPE_TRACE, "app_send", "tx, dev=%s, qid=%u, hash=%u", sendpath->vdev->devsym, sid, hash[i]); } } vnode_mirror_enqueue_bulk(sendpath->target_vdi->vnode_tx_prod, sid, _mbufs, hash, nr_mbufs); int ret = vnode_mirror_rt_object_retrieve(sendpath->target_vdi->vnode_tx_prod, sid, _mbufs, nr_mbufs); if (ret > 0) { marsio_buff_free_v2(sendpath->instance, mbufs, ret); } return RT_SUCCESS; } void marsio_send_burst_flush(struct mr_sendpath * sendpath, queue_id_t sid) { return; } int marsio_poll_register_eventfd(struct mr_instance * instance, int eventfd, unsigned int tid) { struct rte_epoll_event * epoll_event = rte_zmalloc(NULL, sizeof(struct rte_epoll_event), 0); epoll_event->epdata.event = EPOLLIN; epoll_event->epdata.data = NULL; return rte_epoll_ctl(instance->rx_notify_epfd[tid], EPOLL_CTL_ADD, eventfd, epoll_event); } static void clear_all_zero_recv_counters(struct mr_vdev * vdevs[], unsigned int nr_vdevs, unsigned int tid) { for (unsigned int i = 0; i < nr_vdevs; i++) { struct mr_vdev_counters * vdev_counter = &vdevs[i]->counters[tid]; vdev_counter->rx_burst_zero_counters = 0; vdev_counter->rx_burst_call_counters = 0; } } int marsio_poll_wait(struct mr_instance * instance, struct mr_vdev * vdevs[], unsigned int nr_vdevs, unsigned int tid, int timeout) { eventfd_t _not_use_eventfd_value; /* flush the vdev */ for (unsigned int i = 0; i < nr_vdevs; i++) { marsio_send_buffer_flush(vdevs[i], tid); } if (instance->en_notify == 0) { return 0; } unsigned int min_rx_burst_zero_counters = UINT32_MAX; for (unsigned int i = 0; i < nr_vdevs; i++) { struct mr_vdev_counters * vdev_counter = &vdevs[i]->counters[tid]; min_rx_burst_zero_counters = RTE_MIN(min_rx_burst_zero_counters, vdev_counter->rx_burst_zero_counters); } if (min_rx_burst_zero_counters < instance->zero_recv_usleep_threshold) { return 0; } if (min_rx_burst_zero_counters < instance->zero_recv_notify_threshold) { rte_delay_us_sleep(instance->zero_recv_usleep_period); return 0; } /* set the notification status to sleep */ for (unsigned int i = 0; i < nr_vdevs; i++) { struct vnode_cons_notify * cons_notify = vdevs[i]->vdi->vnode_rx_cons_notify; assert(cons_notify[tid].cons_running_status == CONS_STATUS_RUNNING); cons_notify[tid].cons_running_status = CONS_STATUS_WAITING; } /* wait for the event until timeout */ struct rte_epoll_event epoll_events[16]; int n = rte_epoll_wait(instance->rx_notify_epfd[tid], epoll_events, RTE_DIM(epoll_events), timeout); if (unlikely(n < 0)) { MR_ERROR("rte_epoll_wait returned error %d, tap_resp poll thread terminated.", errno); return -1; } /* go back from wait */ for (unsigned int i = 0; i < nr_vdevs; i++) { struct vnode_cons_notify * cons_notify = vdevs[i]->vdi->vnode_rx_cons_notify; assert(cons_notify[tid].cons_running_status == CONS_STATUS_WAITING); cons_notify[tid].cons_running_status = CONS_STATUS_RUNNING; } clear_all_zero_recv_counters(vdevs, nr_vdevs, tid); /* read the data in pipe, and drop it */ /* handle the read event, read the packet, then redirect to shmdev queues */ for (int i = 0; i < n; i++) { struct rte_epoll_event * epoll_event = &epoll_events[i]; eventfd_read(epoll_event->fd, &_not_use_eventfd_value); } if (n > 0) { instance->stat[tid].poll_wake_up_by_ev++; } else { instance->stat[tid].poll_wake_up_timeout++; } return 0; }