#include #include #include #include #include #include #include #include #include #include static inline int __packet_total_len(struct rte_mbuf * mbufs[], unsigned int nr_mbufs) { unsigned int total_len = 0; for (int i = 0; i < nr_mbufs; i++) total_len += rte_pktmbuf_pkt_len(mbufs[i]); return total_len; } int mrapp_packet_fast_send_burst(struct vdev_instance * vdi, queue_id_t qid, struct rte_mbuf * mbufs[], int nr_mbufs) { hash_t mbufs_hash[MR_BURST_MAX]; for (int i = 0; i < nr_mbufs; i++) { mbufs_hash[i] = qid; __rte_mbuf_sanity_check(mbufs[i], 1); } /* 快速发送锁。可以在任何上下文下发送快速报文 快速发送锁实现在应用,应用崩溃不会对主进程产生影响。 快速发送的报文应该数量很小,引入这个锁不会引起性能问题。 */ //TODO: 锁换一个位置 static rte_spinlock_t __f_fast_lock = { 0 }; rte_spinlock_lock(&__f_fast_lock); int ret = vnode_mirror_enqueue_bulk(vdi->vnode_ltx_prod, qid, mbufs, mbufs_hash, nr_mbufs); rte_spinlock_unlock(&__f_fast_lock); return ret; } int marsio_recv_burst(struct mr_vdev * vdev, queue_id_t qid, marsio_buff_t * mbufs[], int nr_mbufs) { assert(qid < vdev->nr_rxstream); if (unlikely(qid >= vdev->nr_rxstream)) return -EINVAL; struct vdev_instance * vdi = vdev->vdi; struct rte_mbuf ** __mbufs = (struct rte_mbuf **)mbufs; int ret = vnode_mirror_dequeue_burst(vdi->vnode_rx_cons, qid, __mbufs, nr_mbufs); if (unlikely(ret == 0)) goto out; /* ARP */ if (vdev->en_arp) arp_entry(vdev->instance, vdi, qid, __mbufs, ret); /* ICMP */ if (vdev->en_icmp) icmp_entry(vdev->instance, vdi, qid, __mbufs, ret); /* 报文合法性检查,防止Double-Free */ for (int i = 0; i < ret; i++) __rte_mbuf_sanity_check(__mbufs[i], i); /* 线程运行情况统计,收报情况,对于非注册线程不统计 */ if (thread_info.instance != NULL) { thread_id_t tid = thread_info.thread_id; thread_info.instance->stat[tid].packet_recv_count += ret; thread_info.instance->stat[tid].packet_recv_length = __packet_total_len(__mbufs, ret); } /* BPF Dumper */ if (unlikely(vdev->bpf_dumper != NULL)) bpf_dumper_write(vdev->bpf_dumper, __mbufs, ret); out: return ret; } int marsio_recv_all_burst(struct mr_instance * instance, queue_id_t qid, marsio_buff_t * mbufs[], int nr_mbufs) { instance->recv_all_state[qid] = (instance->recv_all_state[qid] + 1) % instance->nr_vdevs; unsigned int state = instance->recv_all_state[qid]; struct mr_vdev * vdev = &instance->vdevs[state]; if (unlikely(vdev->nr_rxstream == 0)) return 0; return marsio_recv_burst(vdev, qid, mbufs, nr_mbufs); } /* 需要进行报文构建的SendPath,执行报文构建操作 */ static int __send_burst_packet_construct(struct mr_sendpath * sendpath, queue_id_t sid, marsio_buff_t * mbufs[], int nr_mbufs) { struct rte_mbuf ** __mbufs = (struct rte_mbuf **)mbufs; if (sendpath->fn_prebuild_hook != NULL && sendpath->fn_prebuild_hook(sendpath, __mbufs, nr_mbufs, sendpath->prebuild_hook_args) < 0) goto err; /* 查询这个SendPath是否可用 */ if (!sendpath->can_use && sendpath->fn_requery(sendpath) < 0) goto err; /* 报文构建 */ if (sendpath->fn_l4_construct != NULL && sendpath->fn_l4_construct(sendpath, __mbufs, nr_mbufs) < 0) goto err; if (sendpath->fn_l3_construct != NULL && sendpath->fn_l3_construct(sendpath, __mbufs, nr_mbufs) < 0) goto err; if (sendpath->fn_l2_construct != NULL && sendpath->fn_l2_construct(sendpath, __mbufs, nr_mbufs) < 0) goto err; if (sendpath->fn_postbuild_hook != NULL && sendpath->fn_postbuild_hook(sendpath, __mbufs, nr_mbufs, sendpath->postbuild_hook_args) < 0) goto err; return RT_SUCCESS; err: return RT_ERR; } int marsio_send_burst(struct mr_sendpath * sendpath, queue_id_t sid, marsio_buff_t * mbufs[], int nr_mbufs) { struct rte_mbuf ** __mbufs = (struct rte_mbuf **)mbufs; for (int i = 0; i < nr_mbufs; i++) __rte_mbuf_sanity_check(mbufs[i], i); if (__send_burst_packet_construct(sendpath, sid, mbufs, nr_mbufs) < 0) goto err; /* 从报文本身携带的Hash值计算分流用的Hash值 */ hash_t hash[MR_BURST_MAX]; for (int i = 0; i < nr_mbufs; i++) hash[i] = __mbufs[i]->hash.usr; /* BPF Dumper */ if (unlikely(sendpath->vdev->bpf_dumper != NULL)) bpf_dumper_write(sendpath->vdev->bpf_dumper, __mbufs, nr_mbufs); vnode_mirror_enqueue_bulk(sendpath->target_vdi->vnode_tx_prod, sid, __mbufs, hash, nr_mbufs); /* 线程运行情况统计 */ if (thread_info.instance != NULL) { thread_id_t tid = thread_info.thread_id; thread_info.instance->stat[tid].packet_send_count += nr_mbufs; thread_info.instance->stat[tid].packet_send_length = __packet_total_len(__mbufs, nr_mbufs); } return RT_SUCCESS; err: /* 线程丢包情况统计 */ if (thread_info.instance != NULL) { thread_id_t tid = thread_info.thread_id; thread_info.instance->stat[tid].packet_send_drop += nr_mbufs; } for (int i = 0; i < nr_mbufs; i++) rte_pktmbuf_free(mbufs[i]); return RT_ERR; } int marsio_send_burst_with_options(struct mr_sendpath * sendpath, queue_id_t sid, marsio_buff_t * mbufs[], int nr_mbufs, uint16_t options) { struct rte_mbuf ** __mbufs = (struct rte_mbuf **)mbufs; assert(sid < sendpath->target_vdi->nr_txstream); for (int i = 0; i < nr_mbufs; i++) { assert(__mbufs[i]->data_len != 0 && __mbufs[i]->pkt_len != 0); __rte_mbuf_sanity_check(mbufs[i], i); } if (options & MARSIO_SEND_OPT_NO_FREE) { for (int i = 0; i < nr_mbufs; i++) rte_pktmbuf_refcnt_update(__mbufs[i], 1); } if (options & MARSIO_SEND_OPT_CTRL) { for (int i = 0; i < nr_mbufs; i++) __mbufs[i]->ol_flags |= CTRL_MBUF_FLAG; } if (__send_burst_packet_construct(sendpath, sid, mbufs, nr_mbufs) < 0) goto err; if (options & MARSIO_SEND_OPT_REHASH) { distributer_caculate(sendpath->instance->dist_object, __mbufs, nr_mbufs); } hash_t hash[MR_BURST_MAX]; for (int i = 0; i < nr_mbufs; i++) hash[i] = __mbufs[i]->hash.usr; /* BPF Dumper */ if (unlikely(sendpath->vdev->bpf_dumper != NULL)) bpf_dumper_write(sendpath->vdev->bpf_dumper, __mbufs, nr_mbufs); if (options & MARSIO_SEND_OPT_FAST) { vnode_mirror_enqueue_bulk(sendpath->target_vdi->vnode_ftx_prod, sid, __mbufs, hash, nr_mbufs); } else { vnode_mirror_enqueue_bulk(sendpath->target_vdi->vnode_tx_prod, sid, __mbufs, hash, nr_mbufs); } /* 线程运行情况统计 */ if (thread_info.instance != NULL) { thread_id_t tid = thread_info.thread_id; thread_info.instance->stat[tid].packet_send_count += nr_mbufs; thread_info.instance->stat[tid].packet_send_length = __packet_total_len(__mbufs, nr_mbufs); } return RT_SUCCESS; err: /* 线程丢包情况统计 */ if (thread_info.instance != NULL) { thread_id_t tid = thread_info.thread_id; thread_info.instance->stat[tid].packet_send_drop += nr_mbufs; } for (int i = 0; i < nr_mbufs; i++) rte_pktmbuf_free(mbufs[i]); return RT_ERR; } void marsio_send_burst_flush(struct mr_sendpath * sendpath, queue_id_t sid) { vnode_mirror_flush(sendpath->target_vdi->vnode_tx_prod, sid); vnode_mirror_flush(sendpath->target_vdi->vnode_ftx_prod, sid); } int marsio_idle_poll(struct mr_instance * instance) { return 0; } int marsio_udp_header_construct(marsio_buff_t * buff, uint16_t s_port, uint16_t d_port) { struct rte_mbuf * mbuf = (struct rte_mbuf *)buff; struct udp_hdr * udp_hdr = (struct udp_hdr *)rte_pktmbuf_prepend(mbuf, sizeof(struct udp_hdr)); assert(udp_hdr != NULL); unsigned int l4_len = rte_pktmbuf_pkt_len(mbuf); udp_hdr->src_port = htons(s_port); udp_hdr->dst_port = htons(d_port); udp_hdr->dgram_len = htons(l4_len); udp_hdr->dgram_cksum = 0; mbuf->tx_offload = 0; mbuf->l4_len = sizeof(struct udp_hdr); mbuf->ol_flags |= PKT_TX_L4_NO_CKSUM; return 0; } #define __IP_VERSION_IHL(version, len) (version << 4 | len << 0) #define __IP_TTL 75 int marsio_ipv4_header_construct(marsio_buff_t * buff, uint32_t s_ip, uint32_t d_ip, uint8_t proto) { struct rte_mbuf * mbuf = (struct rte_mbuf *)buff; struct ipv4_hdr * ip_hdr = (struct ipv4_hdr *)rte_pktmbuf_prepend(mbuf, sizeof(struct ipv4_hdr)); uint16_t datalen = rte_pktmbuf_pkt_len(mbuf); assert(ip_hdr != NULL); static uint32_t __thread __ip_counter = 0; ip_hdr->version_ihl = __IP_VERSION_IHL(4, sizeof(struct ipv4_hdr) / 4); ip_hdr->type_of_service = 0; ip_hdr->total_length = rte_cpu_to_be_16(datalen); ip_hdr->packet_id = ++__ip_counter; ip_hdr->fragment_offset = 0; ip_hdr->time_to_live = __IP_TTL; ip_hdr->next_proto_id = proto; ip_hdr->src_addr = rte_cpu_to_be_32(s_ip); ip_hdr->dst_addr = rte_cpu_to_be_32(d_ip); ip_hdr->hdr_checksum = 0; ip_hdr->hdr_checksum = rte_ipv4_cksum(ip_hdr); return 0; }