diff options
| author | 陆秋文 <[email protected]> | 2024-04-01 15:59:57 +0800 |
|---|---|---|
| committer | 陆秋文 <[email protected]> | 2024-04-14 12:38:19 +0000 |
| commit | 40ac816d10a9335c61090208116342bfa3fefd88 (patch) | |
| tree | 9d719a0206575b7a5ffdf916a3aad665b0970885 | |
| parent | 7dfa9e3812f312e4b01c6be643313142d1fd53b7 (diff) | |
Feature retrieve object of backpressure
| -rw-r--r-- | app/include/mrapp.h | 6 | ||||
| -rw-r--r-- | app/include/neigh.h | 53 | ||||
| -rw-r--r-- | app/src/arp.c | 204 | ||||
| -rw-r--r-- | app/src/icmp.c | 98 | ||||
| -rw-r--r-- | app/src/monit.c | 6 | ||||
| -rw-r--r-- | app/src/mrb.c | 35 | ||||
| -rw-r--r-- | app/src/neigh.c | 317 | ||||
| -rw-r--r-- | app/src/rawio.c | 69 | ||||
| -rw-r--r-- | app/src/sendpath.c | 238 | ||||
| -rw-r--r-- | app/src/tap.c | 6 | ||||
| -rw-r--r-- | include/external/marsio.h | 2 | ||||
| -rw-r--r-- | infra/include/vnode.h | 19 | ||||
| -rw-r--r-- | infra/src/vnode_common.c | 201 | ||||
| -rw-r--r-- | infra/src/vnode_common.h | 25 | ||||
| -rw-r--r-- | infra/src/vnode_mirror.c | 185 | ||||
| -rw-r--r-- | infra/test/TestVNode.cc | 187 | ||||
| -rw-r--r-- | service/include/sc_vdev.h | 40 | ||||
| -rw-r--r-- | service/src/node_shmdev.c | 22 | ||||
| -rw-r--r-- | service/src/vdata.c | 47 | ||||
| -rw-r--r-- | service/src/vdev.c | 24 |
20 files changed, 416 insertions, 1368 deletions
diff --git a/app/include/mrapp.h b/app/include/mrapp.h index 3e6da2c..d9f2c5c 100644 --- a/app/include/mrapp.h +++ b/app/include/mrapp.h @@ -4,7 +4,6 @@ #include <ldbc.h> #include <marsio.h> #include <mr_rte_msg.h> -#include <neigh.h> #include <pcap/pcap.h> #include <rte_epoll.h> #include <tap.h> @@ -116,8 +115,6 @@ struct mr_instance struct mr_vdev vdevs[MR_VDEV_MAX]; /* 虚设备实例数量 */ unsigned int nr_vdevs; - /* 邻居管理器 */ - struct neighbour_manager * neigh; /* 负载均衡器 */ struct distributer * dist_object; /* 静态邻居表 */ @@ -184,4 +181,5 @@ int mrapp_monit_loop(struct mr_instance * instance); int mrapp_packet_send_burst(struct vdev_instance * vdi, queue_id_t qid, struct rte_mbuf * mbufs[], int nr_mbufs); -int mrapp_packet_fast_send_burst(struct vdev_instance * vdi, queue_id_t qid, struct rte_mbuf * mbufs[], int nr_mbufs); +int mrapp_packet_fast_send_burst(struct mr_instance * instance, + struct vdev_instance * vdi, queue_id_t qid, struct rte_mbuf * mbufs[], int nr_mbufs); diff --git a/app/include/neigh.h b/app/include/neigh.h deleted file mode 100644 index 18aeae5..0000000 --- a/app/include/neigh.h +++ /dev/null @@ -1,53 +0,0 @@ -#pragma once - -#ifdef __cplusplus -extern "C" { -#endif - -#include <rte_atomic.h> -#include <rte_rwlock.h> -#include <vdev_define.h> -#include <cJSON.h> -#include <MESA_htable.h> - -struct neighbour; -TAILQ_HEAD(neighbour_list, neighbour); - -struct neighbour_manager -{ - /* 读写锁 */ - rte_spinlock_t lock; - /* 查询哈希表 */ - MESA_htable_handle table; - /* 待查询表项队列 */ - struct neighbour_list in_none_list; - /* 待查询表项队列长度(不受rwlock的保护) */ - rte_atomic16_t in_none_list_len; - /* 最大表项数量 */ - unsigned int max_entries; - /* 最大待查询表项数 */ - unsigned int max_queue_entries; - /* 表项老化时间 */ - unsigned short t_timeout; - /* ARP发送频率 */ - unsigned short t_arp_send; -}; - -int neighbour_mamanger_init(struct neighbour_manager * object, - const char * symbol, unsigned int max_entries, unsigned short t_timeout, unsigned int t_arp_send); - -int neighbour_mamanger_deinit(struct neighbour_manager * object); - -int neigh_create_or_update(struct neighbour_manager * object, struct in_addr in_addr, - struct rte_ether_addr * eth_addr, struct vdev_instance * vdi, unsigned short en_permanent); - -int neigh_delete(struct neighbour_manager * object, struct in_addr in_addr); - -int neigh_query(struct neighbour_manager * object, struct in_addr in_addr, - struct rte_ether_addr * out_ether_addr, struct vdev_instance ** vdi); - -cJSON * neighbour_manager_monit(struct neighbour_manager * object); - -#ifdef __cplusplus -} -#endif
\ No newline at end of file diff --git a/app/src/arp.c b/app/src/arp.c deleted file mode 100644 index 538bdca..0000000 --- a/app/src/arp.c +++ /dev/null @@ -1,204 +0,0 @@ -/* \brief 简单协议栈ARP/RARP协议处理模块 - * - * 处理ARP/RARP协议报文数据 - * - * \author Lu Qiuwen<[email protected]> - * \date 2016-10-21 - */ - -#include <arp.h> -#include <common.h> -#include <mrapp.h> -#include <neigh.h> -#include <netinet/in.h> -#include <protect.h> -#include <rte_arp.h> -#include <rte_malloc.h> -#include <rte_version.h> -#include <vdev_define.h> - -static struct rte_ether_addr broadcast_hwaddr = {{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}}; - -static inline int __local_frame_filter(const struct vdev * vdev, const struct rte_ether_addr * ether_addr) -{ - if (rte_is_broadcast_ether_addr(ether_addr)) - return 1; - if (rte_is_same_ether_addr(&vdev->ether_addr, ether_addr)) - return 1; - return 0; -} - -static inline int __local_inaddr_filter(const struct vdev * vdev, const struct in_addr * in_addr) -{ - return (vdev->in_addr.s_addr == in_addr->s_addr); -} - -static void arp_reply_entry(struct neighbour_manager * neigh_manager, struct vdev_instance * vdi, queue_id_t qid, - struct rte_mbuf * mbuf, struct rte_arp_hdr * arp_header) -{ - struct in_addr s_in_addr; - struct in_addr d_in_addr; - memcpy(&s_in_addr, &arp_header->arp_data.arp_sip, sizeof(struct in_addr)); - memcpy(&d_in_addr, &arp_header->arp_data.arp_tip, sizeof(struct in_addr)); - struct rte_ether_addr * s_eth_addr = (struct rte_ether_addr *)&arp_header->arp_data.arp_sha; - struct rte_ether_addr * d_eth_addr = (struct rte_ether_addr *)&arp_header->arp_data.arp_tha; - - // 检测目的IP地址、MAC地址是否是本机的 - if (!__local_frame_filter(vdi->vdev, d_eth_addr)) - goto invalid_frame; - if (!__local_inaddr_filter(vdi->vdev, &d_in_addr)) - goto invalid_frame; - - // TODO: 错误处理 - neigh_create_or_update(neigh_manager, s_in_addr, s_eth_addr, vdi, 0); - -invalid_frame: - return; -} - -static void arp_request_entry(struct neighbour_manager * neigh_manager, struct vdev_instance * vdi, queue_id_t qid, - struct rte_mbuf * mbuf, struct rte_arp_hdr * arp_header) -{ - struct in_addr s_in_addr; - struct in_addr d_in_addr; - memcpy(&s_in_addr, &arp_header->arp_data.arp_sip, sizeof(struct in_addr)); - memcpy(&d_in_addr, &arp_header->arp_data.arp_tip, sizeof(struct in_addr)); - struct rte_ether_addr * s_eth_addr = (struct rte_ether_addr *)&arp_header->arp_data.arp_sha; - struct rte_ether_addr * d_eth_addr = (struct rte_ether_addr *)&arp_header->arp_data.arp_tha; - - // 过滤非广播报文和非目的MAC是本机的报文 - if (!(rte_is_zero_ether_addr(d_eth_addr) || __local_frame_filter(vdi->vdev, d_eth_addr))) - goto done; - - // 根据广播的ARP报文,更新邻居表 - neigh_create_or_update(neigh_manager, s_in_addr, s_eth_addr, vdi, 0); - - // 对请求是本机的,进行响应 - if (!__local_inaddr_filter(vdi->vdev, &d_in_addr)) - goto done; - - struct rte_mbuf * reply_mbuf = PROTECT_rte_pktmbuf_alloc(vdi->direct_pool); - if (unlikely(reply_mbuf == NULL)) - goto done; - - // 构造以太网头 - struct rte_ether_hdr * ether_hdr = - (struct rte_ether_hdr *)PROTECT_rte_pktmbuf_append(reply_mbuf, sizeof(struct rte_ether_hdr)); - -#if RTE_VERSION_NUM(21, 11, 0, 0) <= RTE_VERSION - rte_ether_addr_copy(&vdi->vdev->ether_addr, ðer_hdr->src_addr); - rte_ether_addr_copy(&arp_header->arp_data.arp_sha, ðer_hdr->dst_addr); -#else - rte_ether_addr_copy(&vdi->vdev->ether_addr, ðer_hdr->s_addr); - rte_ether_addr_copy(&arp_header->arp_data.arp_sha, ðer_hdr->d_addr); -#endif - ether_hdr->ether_type = ntohs(RTE_ETHER_TYPE_ARP); - - // 构造ARP应答 - struct rte_arp_hdr * reply_arp_hdr = - (struct rte_arp_hdr *)PROTECT_rte_pktmbuf_append(reply_mbuf, sizeof(struct rte_arp_hdr)); - - rte_memcpy(reply_arp_hdr, arp_header, sizeof(struct rte_arp_hdr)); - reply_arp_hdr->arp_opcode = ntohs(RTE_ARP_OP_REPLY); - -#if RTE_VERSION_NUM(21, 11, 0, 0) <= RTE_VERSION - rte_ether_addr_copy(ðer_hdr->src_addr, &reply_arp_hdr->arp_data.arp_sha); - rte_ether_addr_copy(ðer_hdr->dst_addr, &reply_arp_hdr->arp_data.arp_tha); -#else - rte_ether_addr_copy(ðer_hdr->s_addr, &reply_arp_hdr->arp_data.arp_sha); - rte_ether_addr_copy(ðer_hdr->d_addr, &reply_arp_hdr->arp_data.arp_tha); -#endif - reply_arp_hdr->arp_data.arp_sip = vdi->vdev->in_addr.s_addr; - reply_arp_hdr->arp_data.arp_tip = s_in_addr.s_addr; - - // 写应答包到线路 - mrapp_packet_fast_send_burst(vdi, qid, &reply_mbuf, 1); - -done: - return; -} - -int arp_entry(struct mr_instance * instance, struct vdev_instance * vdi, queue_id_t qid, struct rte_mbuf * mbufs_in[], - int nr_mbufs_in) -{ - int handled_packets = 0; - - for (int i = 0; i < nr_mbufs_in; i++) - { - struct rte_mbuf * mbuf = mbufs_in[i]; - if (unlikely(mbuf == NULL)) - continue; - - struct rte_ether_hdr * eth_hdr = rte_pktmbuf_mtod(mbuf, struct rte_ether_hdr *); - if (eth_hdr->ether_type != ntohs(RTE_ETHER_TYPE_ARP)) - continue; - - struct rte_arp_hdr * arp_hdr = - rte_pktmbuf_mtod_offset(mbuf, struct rte_arp_hdr *, sizeof(struct rte_ether_hdr)); - - if (arp_hdr->arp_opcode == ntohs(RTE_ARP_OP_REQUEST)) - arp_request_entry(instance->neigh, vdi, qid, mbuf, arp_hdr); - else if (arp_hdr->arp_opcode == ntohs(RTE_ARP_OP_REPLY)) - arp_reply_entry(instance->neigh, vdi, qid, mbuf, arp_hdr); - - handled_packets++; - } - - return handled_packets; -} - -int arp_request_send(struct vdev_instance * vdi, queue_id_t qid, struct in_addr in_addr) -{ - struct vdev * dev_info = vdi->vdev; - if (unlikely(dev_info->enable == 0)) - { - MR_DEBUG("Send ARP request on disable device %s, failed.", dev_info->symbol); - return -EINVAL; - } - - if (unlikely(vdi->nr_txstream == 0)) - { - MR_DEBUG("Send ARP request on recieve only device %s, failed.", dev_info->symbol); - return -EINVAL; - } - - struct rte_ether_addr * src_hwaddr = &dev_info->ether_addr; - struct rte_ether_addr * dst_hwaddr = &broadcast_hwaddr; - - struct in_addr in_addr_src = dev_info->in_addr; - struct rte_mbuf * req_mbuf = PROTECT_rte_pktmbuf_alloc(vdi->direct_pool); - if (unlikely(req_mbuf == NULL)) - return -ENOBUFS; - - // 构造以太网头 - struct rte_ether_hdr * ether_hdr = - (struct rte_ether_hdr *)PROTECT_rte_pktmbuf_append(req_mbuf, sizeof(struct rte_ether_hdr)); - -#if RTE_VERSION_NUM(21, 11, 0, 0) <= RTE_VERSION - rte_ether_addr_copy(src_hwaddr, ðer_hdr->src_addr); - rte_ether_addr_copy(dst_hwaddr, ðer_hdr->dst_addr); -#else - rte_ether_addr_copy(src_hwaddr, ðer_hdr->s_addr); - rte_ether_addr_copy(dst_hwaddr, ðer_hdr->d_addr); -#endif - ether_hdr->ether_type = htons(RTE_ETHER_TYPE_ARP); - - // 构造ARP请求报文 - struct rte_arp_hdr * arp_hdr = - (struct rte_arp_hdr *)PROTECT_rte_pktmbuf_append(req_mbuf, sizeof(struct rte_arp_hdr)); - - arp_hdr->arp_hardware = htons(RTE_ARP_HRD_ETHER); - arp_hdr->arp_protocol = htons(RTE_ETHER_TYPE_IPV4); - arp_hdr->arp_hlen = 6; - arp_hdr->arp_plen = 4; - arp_hdr->arp_opcode = htons(RTE_ARP_OP_REQUEST); - arp_hdr->arp_data.arp_sip = in_addr_src.s_addr; - arp_hdr->arp_data.arp_tip = in_addr.s_addr; - - rte_ether_addr_copy(src_hwaddr, &arp_hdr->arp_data.arp_sha); - memset(&arp_hdr->arp_data.arp_tha, 0, sizeof(arp_hdr->arp_data.arp_tha)); - - // 写数据包到线路 - mrapp_packet_fast_send_burst(vdi, qid, &req_mbuf, 1); - return 0; -}
\ No newline at end of file diff --git a/app/src/icmp.c b/app/src/icmp.c deleted file mode 100644 index 8ea4f0b..0000000 --- a/app/src/icmp.c +++ /dev/null @@ -1,98 +0,0 @@ - -#include <mrapp.h> -#include <rte_icmp.h> -#include <rte_ip.h> -#include <protect.h> -#include <rte_version.h> - -static void icmp_echo_request_entry(struct vdev_instance* vdi, queue_id_t sid, - struct rte_mbuf * mbuf, struct in_addr s_in_addr, struct in_addr d_in_addr) -{ - /* 将原来的报文复制一份 */ - struct rte_mbuf * mbuf_cloned = PROTECT_rte_pktmbuf_alloc(vdi->direct_pool); - PROTECT_rte_mbuf_unpoison(mbuf_cloned); - - char * _pkt_data = rte_pktmbuf_append(mbuf_cloned, rte_pktmbuf_data_len(mbuf)); - rte_memcpy(_pkt_data, rte_pktmbuf_mtod(mbuf, char *), rte_pktmbuf_data_len(mbuf)); - - struct rte_icmp_hdr * icmp_hdr = rte_pktmbuf_mtod_offset(mbuf_cloned, struct rte_icmp_hdr *, - sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr)); - - uint32_t cksum; - icmp_hdr->icmp_type = RTE_IP_ICMP_ECHO_REPLY; - cksum = ~icmp_hdr->icmp_cksum & 0xffff; - cksum += ~htons(RTE_IP_ICMP_ECHO_REQUEST << 8) & 0xffff; - cksum += htons(RTE_IP_ICMP_ECHO_REPLY << 8); - cksum = (cksum & 0xffff) + (cksum >> 16); - cksum = (cksum & 0xffff) + (cksum >> 16); - icmp_hdr->icmp_cksum = ~cksum; - - struct rte_ipv4_hdr * ip_hdr = rte_pktmbuf_mtod_offset(mbuf_cloned, - struct rte_ipv4_hdr *, sizeof(struct rte_ether_hdr)); - - ip_hdr->src_addr = d_in_addr.s_addr; - ip_hdr->dst_addr = s_in_addr.s_addr; - ip_hdr->hdr_checksum = 0; - ip_hdr->hdr_checksum = rte_ipv4_cksum(ip_hdr); - - struct rte_ether_addr ether_addr_swap; - struct rte_ether_hdr * eth_hdr = rte_pktmbuf_mtod(mbuf_cloned, struct rte_ether_hdr *); - -#if RTE_VERSION_NUM(21, 11, 0, 0) <= RTE_VERSION - rte_ether_addr_copy(ð_hdr->src_addr, ðer_addr_swap); - rte_ether_addr_copy(ð_hdr->dst_addr, ð_hdr->src_addr); - rte_ether_addr_copy(ðer_addr_swap, ð_hdr->dst_addr); -#else - rte_ether_addr_copy(ð_hdr->s_addr, ðer_addr_swap); - rte_ether_addr_copy(ð_hdr->d_addr, ð_hdr->s_addr); - rte_ether_addr_copy(ðer_addr_swap, ð_hdr->d_addr); -#endif - - mrapp_packet_fast_send_burst(vdi, 0, &mbuf_cloned, 1); - return; -} - -int icmp_entry(struct mr_instance * instance, struct vdev_instance* vdi, - queue_id_t qid, struct rte_mbuf* mbufs_in[], int nr_mbufs_in) -{ - int handled_packets = 0; - struct in_addr s_in_addr; - struct in_addr d_in_addr; - - for (int i = 0; i < nr_mbufs_in; i++) - { - struct rte_mbuf * mbuf = mbufs_in[i]; - if (unlikely(mbuf == NULL)) continue; - - // 仅处理IPv4报文 - struct rte_ether_hdr * eth_hdr = rte_pktmbuf_mtod(mbuf, struct rte_ether_hdr *); - if (eth_hdr->ether_type != ntohs(RTE_ETHER_TYPE_IPV4)) continue; - - // 校验IPv4报文类型 - struct rte_ipv4_hdr * ip_hdr = rte_pktmbuf_mtod_offset(mbuf, struct rte_ipv4_hdr *, - sizeof(struct rte_ether_hdr)); - - if (ip_hdr->next_proto_id != IPPROTO_ICMP) continue; - - s_in_addr.s_addr = ip_hdr->src_addr; - d_in_addr.s_addr = ip_hdr->dst_addr; - - struct rte_icmp_hdr * icmp_hdr = rte_pktmbuf_mtod_offset(mbuf, struct rte_icmp_hdr *, - sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr)); - - if (unlikely(vdi->vdev->in_addr.s_addr != ntohl(d_in_addr.s_addr))) - { - continue; - } - - // ICMP回显请求,其他请求不处理 - if (icmp_hdr->icmp_type == RTE_IP_ICMP_ECHO_REQUEST) - { - icmp_echo_request_entry(vdi, qid, mbuf, s_in_addr, d_in_addr); - } - - handled_packets++; - } - - return handled_packets; -}
\ No newline at end of file diff --git a/app/src/monit.c b/app/src/monit.c index a169ab7..dd9f600 100644 --- a/app/src/monit.c +++ b/app/src/monit.c @@ -74,7 +74,7 @@ static cJSON * __create_uint64_array(const uint64_t * value, int nr_value) return uint64_array; } -static cJSON * __create_uint64_delta_array(const uint64_t * value_large, +static cJSON * __create_uint64_delta_array(const uint64_t * value_large, const uint64_t * value_small, int nr_value, int interval) { struct cJSON * uint64_array = cJSON_CreateArray(); @@ -262,7 +262,7 @@ do { \ cJSON_AddItemToObject(j_root, #item, __create_uint64_array( \ _trans_app_stat_array(item), instance->nr_dataplane_thread)); \ } while(0) \ - + _json_generate(packet_recv_count); _json_generate(packet_recv_length); _json_generate(packet_send_count); @@ -302,7 +302,7 @@ static cJSON * monit_root(struct mr_instance * instance) struct cJSON * j_monit_neigh = neighbour_manager_monit(instance->neigh); cJSON_AddItemToObject(j_root, "neigh", j_monit_neigh); } - + return j_root; } diff --git a/app/src/mrb.c b/app/src/mrb.c index 90f8f04..355a48c 100644 --- a/app/src/mrb.c +++ b/app/src/mrb.c @@ -12,41 +12,16 @@ void * marsio_buff_ctrlzone(marsio_buff_t * mr_buff, uint8_t id) { return NULL; - - PROTECT_rte_mbuf_unpoison_meta((struct rte_mbuf *)mr_buff); - struct mrb_zone_idx * cz = mrbuf_cz(mr_buff, id); - assert(id < mrbuf_cz_num(mr_buff)); - - void * ctrl_addr = mrbuf_cz_data(mr_buff, id); - unsigned int ctrl_len = cz->size; - - MR_ASAN_UNPOISON_MEMORY_REGION(ctrl_addr, ctrl_len); - return ctrl_addr; } void * marsio_buff_ctrlzone_data(marsio_buff_t * mr_buff, uint8_t id, uint8_t * size) { return NULL; - - struct mrb_zone_idx * cz = mrbuf_cz(mr_buff, id); - assert(id < mrbuf_cz_num(mr_buff)); - *size = cz->size; - return mrbuf_cz_data(mr_buff, id); } void marsio_buff_ctrlzone_set(marsio_buff_t * mr_buff, uint8_t id, void * ptr_data, uint8_t size) { return; - - PROTECT_rte_mbuf_unpoison_meta((struct rte_mbuf *)mr_buff); - struct mrb_zone_idx * cz = mrbuf_cz(mr_buff, id); - RTE_SET_USED(cz); - - assert(id < mrbuf_cz_num(mr_buff)); - assert(size <= cz->size); - - MR_ASAN_UNPOISON_MEMORY_REGION(mrbuf_cz_data(mr_buff, id), cz->size); - memcpy(mrbuf_cz_data(mr_buff, id), ptr_data, size); } void marsio_buff_ctrlzone_reset(marsio_buff_t * mr_buff) @@ -72,12 +47,6 @@ void marsio_buff_ctrlzone_reset(marsio_buff_t * mr_buff) void * mr_buffer_ctrlzone(struct rte_mbuf * mr_buff, uint8_t id) { return NULL; - - assert(id < mrbuf_cz_num(mr_buff)); - struct mrb_zone_idx * cz = mrbuf_cz(mr_buff, id); - - MR_ASAN_UNPOISON_MEMORY_REGION(mrbuf_cz_data(mr_buff, id), cz->size); - return mrbuf_cz_data(mr_buff, id); } static struct rte_mempool_cache * mempool_cache_get(struct mr_instance * instance, struct rte_mempool * mp) @@ -372,7 +341,6 @@ static void __buff_clone_ctrlzone(marsio_buff_t * mc, marsio_buff_t * md) /* 断言,__mc和__mi同时到达链表的终点,否则说明链表克隆时出现了问题。 */ assert(__mc == NULL && __md == NULL); - return; } static void __memcpy_operator_buff(struct rte_mbuf * __mi, struct rte_mbuf * __md) @@ -650,7 +618,6 @@ int marsio_buff_is_ctrlbuf(marsio_buff_t * m) { struct rte_mbuf * mbuf = (struct rte_mbuf *)m; struct mrb_metadata * mrb_metadata = (struct mrb_metadata *)mrbuf_cz_data(mbuf, MR_NODE_CTRLZONE_ID); - return mrb_metadata->is_ctrlbuf; } @@ -659,8 +626,6 @@ void marsio_buff_set_ctrlbuf(marsio_buff_t * m) struct rte_mbuf * mbuf = (struct rte_mbuf *)m; struct mrb_metadata * mrb_metadata = (struct mrb_metadata *)mrbuf_cz_data(mbuf, MR_NODE_CTRLZONE_ID); mrb_metadata->is_ctrlbuf = 1; - - return; } void marsio_pktmbuf_dump(FILE * f, const marsio_buff_t * m, unsigned dump_len) diff --git a/app/src/neigh.c b/app/src/neigh.c deleted file mode 100644 index 769031d..0000000 --- a/app/src/neigh.c +++ /dev/null @@ -1,317 +0,0 @@ -/* \brief 简单协议栈邻居子系统 -* -* 处理与主机邻接通信的所需的信息 -* -* \author Lu Qiuwen<[email protected]> -* \date 2016-10-21 -*/ - - -#include <rte_hash.h> -#include <rte_rwlock.h> -#include <rte_atomic.h> -#include <rte_ether.h> -#include <rte_ip_frag.h> -#include <rte_timer.h> -#include <rte_hash_crc.h> -#include <rte_errno.h> -#include <errno.h> -#include <assert.h> -#include <common.h> -#include <neigh.h> -#include <vdev_define.h> -#include <arp.h> -#include <cJSON.h> -#include <arpa/inet.h> - -#include <MESA_htable.h> - -enum neigh_state -{ - NEIGH_IN_NONE = 0, - NEIGH_IN_COMPLETE = 1, - NEIGH_IN_USE = 2, - NEIGH_IN_TIMEOUT = 3, - NEIGH_IN_DEAD = 4 -}; - -static const char * str_neigh_state[] = -{ - [NEIGH_IN_NONE] = "IN_NONE", - [NEIGH_IN_COMPLETE] = "IN_COMPLETE", - [NEIGH_IN_USE] = "IN_USE", - [NEIGH_IN_TIMEOUT] = "IN_TIMEOUT", - [NEIGH_IN_DEAD] = "IN_DEAD" -}; - -/* Neighbour Table Object Rule */ -struct neighbour -{ - /* 链表项 */ - TAILQ_ENTRY(neighbour) next; - /* IP地址 */ - struct in_addr in_addr; - /* 目的MAC地址 */ - struct rte_ether_addr eth_addr; - /* 设备描述符 */ - struct vdev_instance * vdi; - /* 表项状态 */ - enum neigh_state state; - /* 转入当前状态的时间 */ - time_t t_start_state; - /* 上次发送ARP请求的时间 */ - time_t t_last_request; - /* 上次收到ARP应答的时间 */ - time_t t_last_update; - /* 永久标志位 */ - unsigned short en_permanent; -}; - -int neighbour_mamanger_init(struct neighbour_manager * object, - const char * symbol, unsigned int max_entries, unsigned short t_timeout, unsigned int t_arp_send) -{ - const static unsigned int __opt_disable = 0; - - object->table = MESA_htable_born(); - if(unlikely(object->table == NULL)) - { - MR_INFO("Cannot born MESA_htable(), the ret is NULL. "); - goto errout; - } - - /* 不使用MESA_htable的线程安全特性,在外层用自旋锁保证线程安全 - * TODO: 使用MESA_htable的线程安全特性,并增加超时淘汰等特性 */ - MESA_htable_set_opt(object->table, MHO_THREAD_SAFE, (void *)&__opt_disable, sizeof(__opt_disable)); - MESA_htable_set_opt(object->table, MHO_HASH_MAX_ELEMENT_NUM, &max_entries, sizeof(max_entries)); - - int ret = MESA_htable_mature(object->table); - if(ret < 0) - { - MR_INFO("Cannot mature MESA_htable(), the ret is %d. ", ret); - goto errout; - } - - TAILQ_INIT(&object->in_none_list); - rte_atomic16_init(&object->in_none_list_len); - rte_spinlock_init(&object->lock); - - object->t_arp_send = (unsigned short)t_arp_send; - object->t_timeout = t_timeout; - return 0; - -errout: - if (object->table != NULL) MESA_htable_destroy(object->table, NULL); - return -1; -} - -int neighbour_mamanger_deinit(struct neighbour_manager * object) -{ - MESA_htable_destroy(object->table, NULL); - return 0; -} - -static void __change_neigh_state(struct neighbour * neigh, enum neigh_state target_state) -{ - if (target_state != neigh->state) - { - neigh->state = target_state; - neigh->t_start_state = time(NULL); - } -} - -static struct neighbour * __neigh_create_and_join_hash(struct neighbour_manager * object, - struct in_addr in_addr, struct rte_ether_addr * eth_addr, - struct vdev_instance * vdi, unsigned short en_permanent) -{ - struct neighbour * neigh = malloc(sizeof(struct neighbour)); - memset(neigh, 0, sizeof(struct neighbour)); - - neigh->in_addr = in_addr; - neigh->vdi = vdi; - neigh->en_permanent = en_permanent; - neigh->t_last_update = 0; - neigh->t_last_request = 0; - neigh->state = NEIGH_IN_NONE; - - if(eth_addr != NULL) - { - rte_ether_addr_copy(eth_addr, &neigh->eth_addr); - __change_neigh_state(neigh, NEIGH_IN_USE); - } - - if(en_permanent) - { - neigh->en_permanent = 1; - } - - int ret = MESA_htable_add(object->table, (const unsigned char *)&in_addr, sizeof(in_addr), neigh); - if (unlikely(ret < 0)) - { - MR_ERROR("Insert neigh to hash table failed, ret = %d", ret); - goto errout; - } - - return neigh; - -errout: - if (neigh) free(neigh); - return NULL; -} - -static void __neigh_update(struct neighbour * neigh, struct in_addr in_addr, - struct rte_ether_addr * eth_addr, struct vdev_instance * vdi, unsigned short en_permanent) -{ - assert(neigh != NULL); - neigh->in_addr = in_addr; - neigh->en_permanent = en_permanent; - neigh->vdi = vdi; - - if(eth_addr != NULL) - { - rte_ether_addr_copy(eth_addr, &neigh->eth_addr); - __change_neigh_state(neigh, NEIGH_IN_USE); - } - - return; -} - -int neigh_create_or_update(struct neighbour_manager * object, struct in_addr in_addr, - struct rte_ether_addr * eth_addr, struct vdev_instance * vdi, unsigned short en_permanent) -{ - rte_spinlock_lock(&object->lock); - struct neighbour * neigh = MESA_htable_search(object->table, (const unsigned char *)&in_addr, sizeof(in_addr)); - if (unlikely(neigh == NULL)) - { - neigh = __neigh_create_and_join_hash(object, in_addr, eth_addr, vdi, en_permanent); - } - else - { - __neigh_update(neigh, in_addr, eth_addr, vdi, en_permanent); - } - - int ret = 0; - if (unlikely(neigh == NULL)) { ret = -1; goto exit; } - - time_t now_time = time(NULL); - if (neigh->state == NEIGH_IN_NONE && (now_time - neigh->t_last_request) >= object->t_arp_send) - { - arp_request_send(vdi, 0, neigh->in_addr); - neigh->t_last_request = now_time; - } - - ret = 0; goto exit; - -exit: - rte_spinlock_unlock(&object->lock); - return ret; -} - -int neigh_delete(struct neighbour_manager * object, struct in_addr in_addr) -{ - rte_spinlock_lock(&object->lock); - struct neighbour * neigh = MESA_htable_search(object->table, (const unsigned char *)&in_addr, sizeof(in_addr)); - if (neigh == NULL) goto exit; - - // 当处于IN_NONE状态时,从IN_NONE列表中移除 - if (neigh->state == NEIGH_IN_NONE) - { - TAILQ_REMOVE(&object->in_none_list,neigh, next); - rte_atomic16_dec(&object->in_none_list_len); - } - - MESA_htable_del(object->table, (const unsigned char *)&in_addr, sizeof(in_addr), NULL); - return 0; - -exit: - rte_spinlock_unlock(&object->lock); - return -1; -} - -int neigh_query(struct neighbour_manager * object, struct in_addr in_addr, - struct rte_ether_addr * out_ether_addr, struct vdev_instance ** vdi) -{ - rte_spinlock_lock(&object->lock); - struct neighbour * neigh = MESA_htable_search(object->table, (const unsigned char *)&in_addr, sizeof(in_addr)); - - // 没查到 - int ret = 0; - if (neigh == NULL) goto exit; - - // 以下状态信息不全,无法对外提供查询 - if (neigh->state == NEIGH_IN_NONE || - neigh->state == NEIGH_IN_COMPLETE || - neigh->state == NEIGH_IN_DEAD) - { - ret = -EFAULT; goto exit; - } - - rte_ether_addr_copy(&neigh->eth_addr, out_ether_addr); - *vdi = neigh->vdi; - ret = 0; goto exit; - -exit: - rte_spinlock_unlock(&object->lock); - return ret; -} - -static void __neigh_handle_in_no_queue(struct neighbour_manager * object, queue_id_t qid) -{ - if (rte_atomic16_read(&object->in_none_list_len) == 0) return; - - rte_spinlock_lock(&object->lock); - struct neighbour * neigh_iter; - - // 对每一个在In-None列表中的Neigh,发ARP请求。 - TAILQ_FOREACH(neigh_iter, &object->in_none_list, next) - { - assert(neigh_iter->vdi != NULL); - arp_request_send(neigh_iter->vdi, qid, neigh_iter->in_addr); - - // 处理完毕,移除这一项,因哈希表中还有引用,不释放空间 - TAILQ_REMOVE(&object->in_none_list, neigh_iter, next); - rte_atomic16_dec(&object->in_none_list_len); - } - - // In-None列表应当为空,数量计数应当为0 - assert(rte_atomic16_read(&object->in_none_list_len) == 0); - rte_spinlock_unlock(&object->lock); -} - -void neighbour_manager_loop_entry(struct neighbour_manager * object, queue_id_t qid) -{ - __neigh_handle_in_no_queue(object, qid); -} - -void __neighbour_manager_monit_iterate(const uchar * key, uint size, void * data, void *user) -{ - struct neighbour * iter_neigh = (struct neighbour *)data; - struct cJSON * j_root = (struct cJSON *)user; - assert(iter_neigh != NULL && j_root != NULL); - - char str_in_addr[MR_STRING_MAX] = { 0 }; - inet_ntop(AF_INET, &iter_neigh->in_addr, str_in_addr, INET_ADDRSTRLEN); - - char str_ether_addr[MR_STRING_MAX] = { 0 }; - rte_ether_format_addr(str_ether_addr, sizeof(str_ether_addr), &iter_neigh->eth_addr); - - cJSON * j_neigh = cJSON_CreateObject(); - cJSON_AddStringToObject(j_neigh, "InAddress", str_in_addr); - cJSON_AddStringToObject(j_neigh, "HWAddress", str_ether_addr); - cJSON_AddStringToObject(j_neigh, "Interface", iter_neigh->vdi->vdev->symbol); - cJSON_AddStringToObject(j_neigh, "State", str_neigh_state[iter_neigh->state]); - - cJSON_AddItemToArray(j_root, j_neigh); - (void)key; - (void)size; -} - -cJSON * neighbour_manager_monit(struct neighbour_manager * object) -{ - struct cJSON * j_root = cJSON_CreateArray(); - MR_VERIFY_MALLOC(j_root); - - rte_spinlock_lock(&object->lock); - MESA_htable_iterate(object->table, __neighbour_manager_monit_iterate, j_root); - rte_spinlock_unlock(&object->lock); - return j_root; -}
\ No newline at end of file diff --git a/app/src/rawio.c b/app/src/rawio.c index 6876c1f..e3f9f58 100644 --- a/app/src/rawio.c +++ b/app/src/rawio.c @@ -13,7 +13,6 @@ #include <sys/epoll.h> #include <sys/eventfd.h> #include <unistd.h> -#include <mrb_define.h> static inline unsigned int packet_total_len(struct rte_mbuf * mbufs[], unsigned int nr_mbufs) { @@ -23,10 +22,9 @@ static inline unsigned int packet_total_len(struct rte_mbuf * mbufs[], unsigned return total_len; } -int mrapp_packet_fast_send_burst(struct vdev_instance * vdi, queue_id_t qid, struct rte_mbuf * mbufs[], int nr_mbufs) +int mrapp_packet_fast_send_burst(struct mr_instance * instance, + struct vdev_instance * vdi, queue_id_t qid, struct rte_mbuf * mbufs[], int nr_mbufs) { - PROTECT_rte_mbuf_unpoison_bulk(mbufs, nr_mbufs); - hash_t mbufs_hash[MR_BURST_MAX]; for (int i = 0; i < nr_mbufs; i++) { @@ -40,12 +38,19 @@ int mrapp_packet_fast_send_burst(struct vdev_instance * vdi, queue_id_t qid, str */ // TODO: 锁换一个位置 - static rte_spinlock_t __f_fast_lock = {0}; + static rte_spinlock_t _f_fast_lock = {0}; PROTECT_rte_mbuf_poison_bulk(mbufs, nr_mbufs); - rte_spinlock_lock(&__f_fast_lock); - int ret = vnode_mirror_enqueue_bulk(vdi->vnode_ftx_prod, qid, mbufs, mbufs_hash, nr_mbufs); - rte_spinlock_unlock(&__f_fast_lock); + rte_spinlock_lock(&_f_fast_lock); + vnode_mirror_enqueue_bulk(vdi->vnode_ftx_prod, qid, mbufs, mbufs_hash, nr_mbufs); + + int ret = vnode_mirror_rt_object_retrieve(vdi->vnode_ftx_prod, qid, mbufs, nr_mbufs); + if (ret > 0) + { + marsio_buff_free_v2(instance, (marsio_buff_t **)mbufs, ret); + } + + rte_spinlock_unlock(&_f_fast_lock); return ret; } @@ -119,6 +124,8 @@ int marsio_recv_all_burst(struct mr_instance * instance, queue_id_t qid, marsio_ int marsio_send_buffer_flush(struct mr_vdev * vdev, queue_id_t sid) { struct mr_vdev_tx_buffer * tx_buffer = vdev->tx_buffer[sid]; + struct mr_instance * instance = vdev->instance; + if (tx_buffer->length == 0) { return 0; @@ -127,20 +134,22 @@ int marsio_send_buffer_flush(struct mr_vdev * vdev, queue_id_t sid) hash_t hash[MR_BURST_MAX]; for (int i = 0; i < tx_buffer->length; i++) { - //uint16_t hash_txq = tx_buffer->mbufs[i]->hash.txadapter.txq; - //uint32_t hash_usr = tx_buffer->mbufs[i]->hash.usr; - - /* if the hash_qid's highest bit is set, use the txq */ -#if 1 - //hash[i] = hash_txq & 0x8000 ? hash_txq & 0x7FFF : hash_usr; hash[i] = tx_buffer->mbufs[i]->hash.usr; -#else - /* round-robin */ - hash[i] = round_robin_counter++; -#endif } int ret = vnode_mirror_enqueue_bulk(vdev->vdi->vnode_tx_prod, sid, tx_buffer->mbufs, hash, (int)tx_buffer->length); + if (ret < 0) + { + return ret; + } + + /* free the mbufs backpressure from service */ + ret = vnode_mirror_rt_object_retrieve(vdev->vdi->vnode_tx_prod, sid, tx_buffer->mbufs, (int)tx_buffer->length); + if (ret > 0) + { + marsio_buff_free_v2(instance, (marsio_buff_t **)tx_buffer->mbufs, ret); + } + tx_buffer->length = 0; return ret; } @@ -176,21 +185,21 @@ int marsio_send_burst(struct mr_sendpath * sendpath, queue_id_t sid, marsio_buff int marsio_send_burst_with_options(struct mr_sendpath * sendpath, queue_id_t sid, marsio_buff_t * mbufs[], int nr_mbufs, uint16_t options) { - struct rte_mbuf ** __mbufs = (struct rte_mbuf **)mbufs; - PROTECT_rte_mbuf_unpoison_bulk(__mbufs, nr_mbufs); + struct rte_mbuf ** _mbufs = (struct rte_mbuf **)mbufs; + PROTECT_rte_mbuf_unpoison_bulk(_mbufs, nr_mbufs); assert(sid < sendpath->target_vdi->nr_txstream); for (int i = 0; i < nr_mbufs; i++) { - assert(__mbufs[i]->data_len != 0 && __mbufs[i]->pkt_len != 0); + assert(_mbufs[i]->data_len != 0 && _mbufs[i]->pkt_len != 0); __rte_mbuf_sanity_check(mbufs[i], i); } if (options & MARSIO_SEND_OPT_NO_FREE) { for (int i = 0; i < nr_mbufs; i++) - rte_pktmbuf_refcnt_update(__mbufs[i], 1); + rte_pktmbuf_refcnt_update(_mbufs[i], 1); } if (options & MARSIO_SEND_OPT_REHASH) @@ -202,7 +211,7 @@ int marsio_send_burst_with_options(struct mr_sendpath * sendpath, queue_id_t sid hash_t hash[MR_BURST_MAX]; for (int i = 0; i < nr_mbufs; i++) { - hash[i] = __mbufs[i]->hash.usr; + hash[i] = _mbufs[i]->hash.usr; } /* 线程运行情况统计 */ @@ -210,10 +219,16 @@ int marsio_send_burst_with_options(struct mr_sendpath * sendpath, queue_id_t sid { thread_id_t tid = thread_info.thread_id; thread_info.instance->stat[tid].packet_send_count += nr_mbufs; - thread_info.instance->stat[tid].packet_send_length = packet_total_len(__mbufs, nr_mbufs); + thread_info.instance->stat[tid].packet_send_length = packet_total_len(_mbufs, nr_mbufs); + } + + vnode_mirror_enqueue_bulk(sendpath->target_vdi->vnode_tx_prod, sid, _mbufs, hash, nr_mbufs); + int ret = vnode_mirror_rt_object_retrieve(sendpath->target_vdi->vnode_tx_prod, sid, _mbufs, nr_mbufs); + if (ret > 0) + { + marsio_buff_free_v2(sendpath->instance, mbufs, ret); } - vnode_mirror_enqueue_bulk(sendpath->target_vdi->vnode_tx_prod, sid, __mbufs, hash, nr_mbufs); return RT_SUCCESS; } @@ -275,7 +290,7 @@ int marsio_poll_wait(struct mr_instance * instance, struct mr_vdev * vdevs[], un return 0; } - /* set the notify status to sleep */ + /* set the notification status to sleep */ for (unsigned int i = 0; i < nr_vdevs; i++) { struct vnode_cons_notify * cons_notify = vdevs[i]->vdi->vnode_rx_cons_notify; @@ -303,7 +318,7 @@ int marsio_poll_wait(struct mr_instance * instance, struct mr_vdev * vdevs[], un clear_all_zero_recv_counters(vdevs, nr_vdevs, tid); /* read the data in pipe, and drop it */ - /* handle the read event, read the packet then redirect to shmdev queues */ + /* handle the read event, read the packet, then redirect to shmdev queues */ for (int i = 0; i < n; i++) { struct rte_epoll_event * epoll_event = &epoll_events[i]; diff --git a/app/src/sendpath.c b/app/src/sendpath.c index 5306132..453be71 100644 --- a/app/src/sendpath.c +++ b/app/src/sendpath.c @@ -105,245 +105,9 @@ static struct mr_sendpath * sendpath_vdev_create(struct mr_instance * instance, return &_sendpath->_father; } -/* ======================================================================================= */ - -static void sendpath_route_destory(struct mr_sendpath * sendpath) -{ - struct __mr_sendpath_route * _sendpath = container_of(sendpath, - struct __mr_sendpath_route, _father); - - free(_sendpath); -} - -static int sendpath_route_l2_construct(struct mr_sendpath * sendpath, - struct rte_mbuf * mbuf[], unsigned int nr_mbuf) -{ - struct __mr_sendpath_route * _sendpath = container_of(sendpath, - struct __mr_sendpath_route, _father); - - for (int i = 0; i < nr_mbuf; i++) - { - struct rte_ether_hdr * ether_hdr = (struct rte_ether_hdr *)rte_pktmbuf_prepend(mbuf[i], - sizeof(struct rte_ether_hdr)); - - MR_VERIFY_2(ether_hdr != NULL, "Not enough space for ethernet header in mbufs. "); - -#if RTE_VERSION_NUM(21, 11, 0, 0) <= RTE_VERSION - ether_hdr->src_addr = _sendpath->src_eth_addr; - ether_hdr->dst_addr = _sendpath->dst_eth_addr; -#else - ether_hdr->s_addr = _sendpath->src_eth_addr; - ether_hdr->d_addr = _sendpath->dst_eth_addr; -#endif - ether_hdr->ether_type = htons(RTE_ETHER_TYPE_IPV4); - } - - return 0; -} - -static int sendpath_route_l3_construct(struct mr_sendpath * sendpath, - struct rte_mbuf * mbuf[], unsigned int nr_mbuf) -{ - struct __mr_sendpath_route * _sendpath = container_of(sendpath, - struct __mr_sendpath_route, _father); - - for (int i = 0; i < nr_mbuf; i++) - { - struct rte_ipv4_hdr * ip_hdr = rte_pktmbuf_mtod(mbuf[i], struct rte_ipv4_hdr *); - ip_hdr->src_addr = _sendpath->src_addr.s_addr; - ip_hdr->dst_addr = _sendpath->dst_addr.s_addr; - - ip_hdr->hdr_checksum = 0; - ip_hdr->hdr_checksum = rte_ipv4_cksum(ip_hdr); - } - - return 0; -} - -static int sendpath_route_requery(struct mr_sendpath * sendpath) -{ - struct mr_instance * instance = sendpath->instance; - struct __mr_sendpath_route * _sendpath = container_of(sendpath, - struct __mr_sendpath_route, _father); - - int ret = neigh_query(instance->neigh, _sendpath->target_addr, - &_sendpath->dst_eth_addr, &sendpath->target_vdi); - - if (ret < 0) - { - neigh_create_or_update(instance->neigh, _sendpath->target_addr, NULL, - sendpath->target_vdi, 0); return RT_ERR; - } - else - { - sendpath->can_use = 1; - return RT_SUCCESS; - } - - return RT_SUCCESS; -} - -static int sendpath_route_option_set(struct mr_instance * instance, struct mr_sendpath * sendpath, - int opt, va_list va_list) -{ - if (opt == MR_SENDPATH_OPT_BUILD_L2) - { - unsigned int enable = va_arg(va_list, unsigned int); - sendpath->fn_l2_construct = enable ? sendpath_route_l2_construct : NULL; - } - else if (opt == MR_SENDPATH_OPT_BUILD_L3) - { - unsigned int enable = va_arg(va_list, unsigned int); - sendpath->fn_l3_construct = enable ? sendpath_route_l3_construct : NULL; - } - else if (opt == MR_SENDPATH_OPT_HOOK_PREBUILD) - { - sendpath->fn_prebuild_hook = va_arg(va_list, void *); - sendpath->prebuild_hook_args = va_arg(va_list, void *); - } - else if (opt == MR_SENDPATH_OPT_HOOK_POSTBUILD) - { - sendpath->fn_postbuild_hook = va_arg(va_list, void *); - sendpath->postbuild_hook_args = va_arg(va_list, void *); - } - else - { - MR_ERROR("Invalided opt type in %s()", __FUNCTION__); - return -EINVAL; - } - - return 0; -} - -static int sendpath_route_option_get(struct mr_instance * instance, struct mr_sendpath * sendpath, - int opt, va_list va_list) -{ - return 0; -} - static struct mr_sendpath * sendpath_route_create(struct mr_instance * instance, int type, va_list va_list) { - struct mr_vdev * target_vdev = NULL; - struct in_addr in_addr; - - /* 对于一般的路由,传入目标地址 */ - if (type == MR_SENDPATH_ROUTE_NORMAL) - { - target_vdev = NULL; - in_addr = va_arg(va_list, struct in_addr); - } - - /* 对于指定出口设备的路由,传入(1)设备句柄(2)目标IP地址 */ - else if (type == MR_SENDPATH_ROUTE_SPEC_DEV) - { - target_vdev = va_arg(va_list, struct mr_vdev *); - in_addr = va_arg(va_list, struct in_addr); - } - else - { - assert(0); - } - - struct in_addr target_in_addr = { INADDR_NONE }; - - /* 查找合适的设备 */ - if (target_vdev != NULL) goto _build; - - for (int i = 0; i < instance->nr_vdevs; i++) - { - struct mr_vdev * vdev = &instance->vdevs[i]; - struct vdev * __vdev = vdev->vdi->vdev; - - if (!is_same_subnet(in_addr, __vdev->in_addr, __vdev->in_mask)) - continue; - - target_in_addr = in_addr; - target_vdev = vdev; - } - -_build: - - /* 没有找到设备,说明没有路由 */ - if (target_vdev == NULL) - { - char str_in_addr[MR_STRING_MAX] = { 0 }; - inet_ntop(AF_INET, &in_addr, str_in_addr, INET_ADDRSTRLEN); - - MR_ERROR("No route to address %s, creating route sendpath failed. ", - str_in_addr); return NULL; - } - - struct vdev * __target_vdev = target_vdev->vdi->vdev; - - /* 判断目标IP地址是否属于网卡所在的网段,如不在,走网关 */ - if (is_same_subnet(in_addr, __target_vdev->in_addr, __target_vdev->in_mask)) - { - target_in_addr = in_addr; - } - else - { - target_in_addr = __target_vdev->in_gateway; - } - - /* 目的地址不合法,返回 */ - if (target_in_addr.s_addr == htonl(INADDR_ANY) || - target_in_addr.s_addr == htonl(INADDR_LOOPBACK) || - target_in_addr.s_addr == htonl(INADDR_NONE)) - { - char str_target_in_addr[MR_STRING_MAX] = { 0 }; - char str_in_addr[MR_STRING_MAX] = { 0 }; - - inet_ntop(AF_INET, &target_in_addr, str_target_in_addr, INET_ADDRSTRLEN); - inet_ntop(AF_INET, &in_addr, str_in_addr, INET_ADDRSTRLEN); - - MR_WARNING("Invalid target ip address %s(or next hop address), " - "creating route sendpath for %s failed. ", str_target_in_addr, - str_in_addr); return NULL; - } - - struct __mr_sendpath_route * _sendpath = malloc(sizeof(struct __mr_sendpath_route)); - memset(_sendpath, 0, sizeof(struct __mr_sendpath_route)); - - struct mr_sendpath * sendpath = &_sendpath->_father; - - /* 填充SendPath各虚函数指针*/ - sendpath->instance = target_vdev->instance; - sendpath->target_vdi = target_vdev->vdi; - sendpath->vdev = target_vdev; - - sendpath->fn_requery = sendpath_route_requery; - sendpath->fn_l2_construct = sendpath_route_l2_construct; - sendpath->fn_l3_construct = sendpath_route_l3_construct; - sendpath->fn_l4_construct = NULL; - - sendpath->fn_option_get = sendpath_route_option_get; - sendpath->fn_option_set = sendpath_route_option_set; - sendpath->fn_destory = sendpath_route_destory; - - _sendpath->src_addr = sendpath->target_vdi->vdev->in_addr; - _sendpath->dst_addr = in_addr; - _sendpath->target_addr = target_in_addr; - rte_ether_addr_copy(&__target_vdev->ether_addr, &_sendpath->src_eth_addr); - - /* 查ARP表 */ - int ret = neigh_query(target_vdev->instance->neigh, target_in_addr, - &_sendpath->dst_eth_addr, &sendpath->target_vdi); - - /* 没有查询成功,保存起来下一次再查 */ - if (ret < 0) - { - neigh_create_or_update(target_vdev->instance->neigh, target_in_addr, - NULL, target_vdev->vdi, 0); sendpath->can_use = 0; - } - else - { - sendpath->can_use = 1; - } - - MR_DEBUG("Sendpath created: type=%d, in_addr=%u, target_in_addr=%u", type, - in_addr.s_addr, target_in_addr.s_addr); - - return sendpath; + return NULL; } /* ======================================================================================== */ diff --git a/app/src/tap.c b/app/src/tap.c index edc41a3..12994ad 100644 --- a/app/src/tap.c +++ b/app/src/tap.c @@ -331,7 +331,7 @@ static void * tap_representor_poll_thread_entry(void * arg) goto errout; } - /* handle the read event, read the packet then redirect to shmdev queues */ + /* handle the read event, read the packet, then redirect to shmdev queues */ for (int i = 0; i < n; i++) { struct tap_device * tap_dev = (struct tap_device *)(epoll_events[i].epdata.data); @@ -345,10 +345,10 @@ static void * tap_representor_poll_thread_entry(void * arg) continue; } - ret = mrapp_packet_fast_send_burst(vdev->vdi, 0, (struct rte_mbuf **)buff, RTE_DIM(buff)); + ret = mrapp_packet_fast_send_burst(mr_instance, vdev->vdi, 0, (struct rte_mbuf **)buff, RTE_DIM(buff)); if (unlikely(ret < 0)) { - marsio_buff_free(mr_instance, buff, RTE_DIM(buff), MARSIO_SOCKET_ID_ANY, MARSIO_LCORE_ID_ANY); + marsio_buff_free_v2(mr_instance, buff, RTE_DIM(buff)); } } } diff --git a/include/external/marsio.h b/include/external/marsio.h index c993d74..4ed5025 100644 --- a/include/external/marsio.h +++ b/include/external/marsio.h @@ -265,6 +265,8 @@ int marsio_buff_malloc_global(struct mr_instance * instance, marsio_buff_t * mar void marsio_buff_free(struct mr_instance * instance, marsio_buff_t * marsio_buff[], unsigned int nr_mbufs, int socket_id, int thread_id); +void marsio_buff_free_v2(struct mr_instance * instance, marsio_buff_t * buff[], unsigned int nr_buffs); + void marsio_buff_do_rehash(struct mr_instance * mr_instance, marsio_buff_t * m); int marsio_buff_is_ctrlbuf(marsio_buff_t * m); diff --git a/infra/include/vnode.h b/infra/include/vnode.h index 557fb76..95dd8e5 100644 --- a/infra/include/vnode.h +++ b/infra/include/vnode.h @@ -108,22 +108,23 @@ struct vnode; struct vnode_prod; struct vnode_cons; -int vnode_mirror_enqueue_bulk(struct vnode_prod * prod, - unsigned int prodq, struct rte_mbuf * objects[], uint32_t hash[], int nr_objects); +int vnode_mirror_enqueue_bulk(struct vnode_prod * prod, unsigned int prodq, struct rte_mbuf * objects[], + uint32_t hash[], unsigned int nr_objects); -int vnode_mirror_dequeue_burst(struct vnode_cons * cons, - unsigned int consq, struct rte_mbuf * objects[], int nr_max_objects); +int vnode_mirror_dequeue_burst(struct vnode_cons * cons, unsigned int consq, struct rte_mbuf * objects[], + int nr_max_objects); -struct vnode * vnode_mirror_create(const char * sym, unsigned int sz_exclusive, unsigned int sz_shared, - unsigned int sz_buffer, unsigned int notify_cons_when_rx, - unsigned int batch_interval_us, unsigned int en_q_len_monitor); +int vnode_mirror_rt_object_retrieve(struct vnode_prod * prod, unsigned int prodq, struct rte_mbuf * objects[], + unsigned int nr_max_objects); + +struct vnode * vnode_mirror_create(const char * sym, unsigned int sz_exclusive, unsigned int sz_buffer, + unsigned int notify_cons_when_rx, unsigned int batch_interval_us, + unsigned int en_q_len_monitor); int vnode_mirror_delete(struct vnode * vnode); void vnode_mirror_flush(struct vnode_prod * prod, unsigned int prodq); -void vnode_mirror_common_unpoison(struct vnode * vnode); - __DECLARE_COMMON_VNODE_CREATE_PROD(mirror) __DECLARE_COMMON_VNODE_CREATE_CONS(mirror) __DECLARE_COMMON_VNODE_DELETE_PROD(mirror) diff --git a/infra/src/vnode_common.c b/infra/src/vnode_common.c index 06a8513..67e019e 100644 --- a/infra/src/vnode_common.c +++ b/infra/src/vnode_common.c @@ -1,26 +1,26 @@ /* Virtual Data Deliver Node - Base Class Author : Lu Qiuwen<[email protected]> - Zheng Chao<[email protected]> + Zheng Chao<[email protected]> Date : 2016-12-10 TODO: Try to use RCU in thread safe mode. */ +#include <rte_branch_prediction.h> +#include <rte_cycles.h> #include <rte_log.h> #include <rte_malloc.h> -#include <rte_branch_prediction.h> #include <rte_ring.h> -#include <rte_cycles.h> -#include <string.h> +#include <assert.h> +#include <fcntl.h> +#include <math.h> #include <stdio.h> #include <stdlib.h> -#include <assert.h> -#include <unistd.h> +#include <string.h> #include <sys/eventfd.h> -#include <math.h> -#include <fcntl.h> +#include <unistd.h> #include <common.h> #include <rte_mbuf.h> @@ -38,33 +38,34 @@ * | nr_prodq | nr_consq | descs[0][0] | desc[0][1] | .......... | * +-------------+-------------+-------------+------------+------------+ * Len = sizeof(nr_prodq) + sizeof(nr_consq) + sizeof(descs) * nr_prodq * nr_consq -*/ + */ static struct tunnel_desc * tunnel_new(const char * symbol, unsigned int sz_exclusive, unsigned int sz_shared, - unsigned int sz_buffer, rte_atomic32_t * shared_counter) + unsigned int sz_buffer) { struct tunnel_desc * desc = ZMALLOC(sizeof(struct tunnel_desc)); MR_VERIFY_MALLOC(desc); - desc->tunnel_object = - rte_ring_create(symbol, sz_exclusive + sz_shared, SOCKET_ID_ANY, RING_F_SC_DEQ | RING_F_SP_ENQ | RING_F_EXACT_SZ); + desc->tunnel_object = rte_ring_create(symbol, sz_exclusive + sz_shared, SOCKET_ID_ANY, + RING_F_SC_DEQ | RING_F_SP_ENQ); if (desc->tunnel_object == NULL) { - MR_ERROR("Create tunnel %s failed : %s", symbol, MR_STR_ERRNO(errno)); + MR_ERROR("Create tunnel %s failed : %s", symbol, strerror(errno)); goto errout; } snprintf(desc->symbol, sizeof(desc->symbol), "%s", symbol); desc->tunnel_size = sz_exclusive + sz_shared; - desc->tunnel_exclusive_size = sz_exclusive; - desc->shared_credict_counter = shared_counter; - desc->shared_credict_used = 0; - desc->en_buffer = ZMALLOC(sizeof(void *) * MR_LIBVNODE_MAX_SZ_BURST); MR_VERIFY_MALLOC(desc->en_buffer); + + desc->rt_buffer = ZMALLOC(sizeof(void *) * MR_LIBVNODE_MAX_SZ_BURST); + MR_VERIFY_MALLOC(desc->rt_buffer); + desc->sz_en_buffer = MR_LIBVNODE_MAX_SZ_BURST; + desc->sz_rt_buffer = MR_LIBVNODE_MAX_SZ_BURST; #if VNODE_CHECK_THREAD_SAFE rte_spinlock_init(&desc->lock_thread_safe_check); @@ -84,6 +85,11 @@ errout: FREE(&desc->en_buffer); } + if (desc->rt_buffer != NULL) + { + FREE(&desc->rt_buffer); + } + FREE(&desc); return NULL; } @@ -108,48 +114,45 @@ static int tunnel_delete(struct tunnel_desc * desc) /* Delete a block of tunnels */ static int tunnel_block_delete(struct tunnel_block * block) { - for (int prodq_id = 0; prodq_id < block->nr_prodq; prodq_id++) - { - for (int consq_id = 0; consq_id < block->nr_consq; consq_id++) - { - if (*tunnel_block_locate(block, prodq_id, consq_id) != NULL) - tunnel_delete(*tunnel_block_locate(block, prodq_id, consq_id)); - } - } - - rte_free(block); - return 0; + for (int prodq_id = 0; prodq_id < block->nr_prodq; prodq_id++) + { + for (int consq_id = 0; consq_id < block->nr_consq; consq_id++) + { + if (*tunnel_block_locate(block, prodq_id, consq_id) != NULL) + tunnel_delete(*tunnel_block_locate(block, prodq_id, consq_id)); + } + } + + rte_free(block); + return 0; } /* Alloc a block of tunnels, and init all the tunnels */ static struct tunnel_block * tunnel_block_new(const char * symbol, struct vnode_prod * prod, struct vnode_cons * cons, - unsigned int sz_exclusive, unsigned int sz_shared, - unsigned int sz_buffer, rte_atomic32_t * shared_counter) + unsigned int sz_exclusive, unsigned int sz_shared, unsigned int sz_buffer) { - unsigned int nr_prodq = prod->nr_prodq; - unsigned int nr_consq = cons->nr_consq; + unsigned int nr_prodq = prod->nr_prodq; + unsigned int nr_consq = cons->nr_consq; - unsigned int block_size = sizeof(struct tunnel_block) + - sizeof(struct tunnel_desc *) * (nr_prodq * nr_consq); + unsigned int block_size = sizeof(struct tunnel_block) + sizeof(struct tunnel_desc *) * (nr_prodq * nr_consq); - struct tunnel_block * block = (struct tunnel_block *) - rte_zmalloc(NULL, block_size, 0); + struct tunnel_block * block = (struct tunnel_block *)rte_zmalloc(NULL, block_size, 0); - MR_VERIFY_MALLOC(block); - block->cons = cons; - block->prod = prod; - block->nr_consq = nr_consq; - block->nr_prodq = nr_prodq; + MR_VERIFY_MALLOC(block); + block->cons = cons; + block->prod = prod; + block->nr_consq = nr_consq; + block->nr_prodq = nr_prodq; - // create tunnel for each prodq and consq in block - for (int prodq_id = 0; prodq_id < nr_prodq; prodq_id++) + // create tunnel for each prodq and consq in block + for (int prodq_id = 0; prodq_id < nr_prodq; prodq_id++) { for (int consq_id = 0; consq_id < nr_consq; consq_id++) { char tunnel_sym[MR_SYMBOL_MAX]; snprintf(tunnel_sym, sizeof(tunnel_sym), "%s-%d-%d", symbol, prodq_id, consq_id); - struct tunnel_desc * tdesc = tunnel_new(tunnel_sym, sz_exclusive, sz_shared, sz_buffer, shared_counter); + struct tunnel_desc * tdesc = tunnel_new(tunnel_sym, sz_exclusive, sz_shared, sz_buffer); if (tdesc == NULL) goto err; @@ -157,13 +160,13 @@ static struct tunnel_block * tunnel_block_new(const char * symbol, struct vnode_ } } - return block; + return block; err: - MR_ERROR("Create tunnel block %s failed, tunnel size = %d, tunnel buffer = %d", - symbol, sz_exclusive, sz_buffer); - if (block) tunnel_block_delete(block); - return NULL; + MR_ERROR("Create tunnel block %s failed, tunnel size = %d, tunnel buffer = %d", symbol, sz_exclusive, sz_buffer); + if (block) + tunnel_block_delete(block); + return NULL; } static int do_producer_join_unsafe(struct vnode * vnode, struct vnode_prod * prod) @@ -177,8 +180,7 @@ static int do_producer_join_unsafe(struct vnode * vnode, struct vnode_prod * pro snprintf(block_sym, sizeof(block_sym), "%s-%s-%s", vnode->symbol, prod->symbol, cons->symbol); // create communication tunnel for each cons and prods - block = tunnel_block_new(block_sym, prod, cons, vnode->sz_tunnel, vnode->sz_shared, vnode->sz_tunnel_buffer, - &vnode->shared_credict_counter); + block = tunnel_block_new(block_sym, prod, cons, vnode->sz_tunnel, vnode->sz_shared, vnode->sz_tunnel_buffer); if (block == NULL) { @@ -206,11 +208,7 @@ static int do_consumer_join_unsafe(struct vnode * vnode, struct vnode_cons * con { char block_sym[MR_SYMBOL_MAX]; snprintf(block_sym, sizeof(block_sym) - 1, "%s-%s-%s", vnode->symbol, prod->symbol, cons->symbol); - - // create a communication tunnel for all cons and prods - rte_atomic32_t * shared_credict_counter = vnode->sz_shared > 0 ? &vnode->shared_credict_counter : NULL; - block = tunnel_block_new(block_sym, prod, cons, vnode->sz_tunnel, vnode->sz_shared, vnode->sz_tunnel_buffer, - shared_credict_counter); + block = tunnel_block_new(block_sym, prod, cons, vnode->sz_tunnel, vnode->sz_shared, vnode->sz_tunnel_buffer); if (block == NULL) { @@ -222,9 +220,6 @@ static int do_consumer_join_unsafe(struct vnode * vnode, struct vnode_cons * con } vnode->cons = cons; - - /* reset the shared credict counter */ - rte_atomic32_set(&vnode->shared_credict_counter, (int32_t)vnode->sz_shared); return 0; error: @@ -234,7 +229,7 @@ error: static void synchronize_dataplane() { - rte_delay_ms(100);//assume each function in operation will finished after such time. + rte_delay_ms(100); // assume each function in operation will finished after such time. } /* VNode Structure Operation Functions */ @@ -277,20 +272,16 @@ struct vnode_prod * __vnode_common_create_prod(struct vnode * vnode, const char return prod; err: - if (prod != NULL) - { - rte_free(prod); - } - + rte_free(prod); return NULL; } struct variable_monitor_ioctl_cmd_args { - pid_t task_id; // current process id + pid_t task_id; // current process id char name[16]; - void * var_ptr; // virtual address - int var_len; // byte + void * var_ptr; // virtual address + int var_len; // byte long long threshold; // threshold value unsigned char unsigned_flag; // unsigned flag (true: unsigned, false: signed) unsigned char greater_flag; // reverse flag (true: >, false: <) @@ -420,42 +411,42 @@ struct vnode_cons * __vnode_common_cons_lookup(struct vnode * vnode, const char int __vnode_common_cons_attach(struct vnode * node, struct vnode_cons * cons) { - int ret; - assert(node != NULL && cons != NULL && cons->vnode == node); - - rte_spinlock_lock(&node->lock); - if (cons->cur_attach >= cons->nr_consq) - { - MR_ERROR("Too much attach request(cur_attach=%d, nr_consq=%d)", - cons->cur_attach, cons->nr_consq); ret = -1; - } - else - { - ret = cons->cur_attach++; - } - - rte_spinlock_unlock(&node->lock); - return ret; + int ret; + assert(node != NULL && cons != NULL && cons->vnode == node); + + rte_spinlock_lock(&node->lock); + if (cons->cur_attach >= cons->nr_consq) + { + MR_ERROR("Too much attach request(cur_attach=%d, nr_consq=%d)", cons->cur_attach, cons->nr_consq); + ret = -1; + } + else + { + ret = cons->cur_attach++; + } + + rte_spinlock_unlock(&node->lock); + return ret; } -int __vnode_common_prod_attach(struct vnode * node, struct vnode_prod* prod) +int __vnode_common_prod_attach(struct vnode * node, struct vnode_prod * prod) { - int ret; - assert(node != NULL && prod != NULL && prod->vnode == node); - - rte_spinlock_lock(&node->lock); - if (prod->cur_attach >= prod->nr_prodq) - { - MR_ERROR("Too much attach request(cur_attach=%d, nr_prodq=%d)", - prod->cur_attach, prod->nr_prodq); ret = -1; - } - else - { - ret = prod->cur_attach++; - } - - rte_spinlock_unlock(&node->lock); - return ret; + int ret; + assert(node != NULL && prod != NULL && prod->vnode == node); + + rte_spinlock_lock(&node->lock); + if (prod->cur_attach >= prod->nr_prodq) + { + MR_ERROR("Too much attach request(cur_attach=%d, nr_prodq=%d)", prod->cur_attach, prod->nr_prodq); + ret = -1; + } + else + { + ret = prod->cur_attach++; + } + + rte_spinlock_unlock(&node->lock); + return ret; } struct vnode_prod_stat * __vnode_common_prod_stat_get(struct vnode_prod * prod) @@ -515,7 +506,7 @@ struct vnode_cons_stat * __vnode_common_cons_stat_get(struct vnode_cons * cons) } /* calculate the stats at here instead of enq, deq */ - for(unsigned int consq = 0; consq < cons->nr_consq; consq++) + for (unsigned int consq = 0; consq < cons->nr_consq; consq++) { uint64_t consq_on_line = 0; uint64_t consq_deliver = 0; @@ -525,10 +516,10 @@ struct vnode_cons_stat * __vnode_common_cons_stat_get(struct vnode_cons * cons) unsigned int consq_q_len_max = 0; float consq_q_len_avg_max = 0; - for(unsigned int prodq = 0; prodq < prod->nr_prodq; prodq++) + for (unsigned int prodq = 0; prodq < prod->nr_prodq; prodq++) { struct tunnel_block * block = ACCESS_ONCE(cons->block); - if(block == NULL) + if (block == NULL) { return cons_stat; } @@ -631,7 +622,7 @@ int __vnode_common_delete(struct vnode * vnode) void __vnode_common_unpoison(struct vnode * vnode) { - MR_ASAN_UNPOISON_MEMORY_REGION(vnode, sizeof(struct vnode)); + MR_ASAN_UNPOISON_MEMORY_REGION(vnode, sizeof(struct vnode)); } void __vnode_common_unpoison_prod(struct vnode_prod * prod) diff --git a/infra/src/vnode_common.h b/infra/src/vnode_common.h index bc2eef7..25808b4 100644 --- a/infra/src/vnode_common.h +++ b/infra/src/vnode_common.h @@ -30,34 +30,22 @@ struct tunnel_desc struct rte_ring * tunnel_object; /* Tunnel Size */ unsigned int tunnel_size; - /* tunnel exclusive size */ - unsigned int tunnel_exclusive_size; /* Tunnel Enqueue Buffer */ struct rte_mbuf ** en_buffer; - -#if 0 - /* Tunnel Enqueue Buffer Size */ - struct rte_mbuf ** en_returned_buffer; -#endif + struct rte_mbuf ** rt_buffer; /* second cacheline, read/write */ RTE_MARKER cacheline1 __rte_cache_min_aligned; + /* Tunnel Enqueue Buffer Size */ unsigned int sz_en_buffer; /* Tunnel Enqueue Buffer Used */ unsigned int sz_en_buffer_used; - /* shared tunnel use */ - unsigned int shared_credict_used; - /* counter */ - rte_atomic32_t * shared_credict_counter; - -#if 0 - /* Tunnel Enqueue Buffer Returned */ - unsigned int sz_en_buffer_returned; - /* Tunnel Enqueue Buffer Returned */ - unsigned int sz_en_buffer_returned_used; -#endif + /* Return Buffer Size */ + unsigned int sz_rt_buffer; + /* Return Buffer Used */ + unsigned int sz_rt_buffer_used; #if VNODE_CHECK_THREAD_SAFE /* For debug, to check concurrent access of en_buffer */ @@ -148,7 +136,6 @@ struct vnode /* Guarantees one operator(consumer or producer, create or destroy) a time */ rte_spinlock_t lock __rte_cache_aligned; - rte_atomic32_t shared_credict_counter __rte_cache_aligned; }; #ifndef MR_LIBVNODE_MAX_SZ_BURST diff --git a/infra/src/vnode_mirror.c b/infra/src/vnode_mirror.c index 0978a20..cef42bf 100644 --- a/infra/src/vnode_mirror.c +++ b/infra/src/vnode_mirror.c @@ -1,18 +1,24 @@ - -/* Virtual Data Deliver Node - Packet Deliver Class - Author : Lu Qiuwen<[email protected]> - Zheng Chao<[email protected]> - Date : 2017-01-09 -*/ - #include <rte_mbuf.h> #include <rte_ring.h> #include <rte_version.h> #include <sys/eventfd.h> #include <unistd.h> - #include "vnode_common.h" +static inline unsigned int dist_tunnel_rt_objects_retrieve(struct tunnel_desc * desc, struct rte_mbuf * rt_objs[], + unsigned int nr_max_rt_objs) +{ + assert(desc->sz_rt_buffer_used <= desc->sz_rt_buffer); + unsigned int nr_rt_objs = desc->sz_rt_buffer_used; + for (unsigned int i = 0; i < nr_rt_objs; i++) + { + rt_objs[i] = desc->rt_buffer[i]; + } + + desc->sz_rt_buffer_used = 0; + return nr_rt_objs; +} + static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons * cons, unsigned int prodq, unsigned int consq, struct tunnel_desc * desc) { @@ -23,36 +29,7 @@ static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons return; } - unsigned int nr_ring_count = desc->shared_credict_counter != NULL ? rte_ring_count(desc->tunnel_object) : 0; - unsigned int nr_ring_to_use = nr_ring_count + desc->sz_en_buffer_used; - unsigned int nr_shared_credict = 0; - - if (nr_ring_to_use > desc->tunnel_exclusive_size) - { - /* need to apply shared credict */ - assert(nr_ring_to_use >= (desc->tunnel_exclusive_size + desc->shared_credict_used)); - - nr_shared_credict = nr_ring_to_use - (desc->tunnel_exclusive_size + desc->shared_credict_used); - while (1) - { - uint32_t cur_value = rte_atomic32_read(desc->shared_credict_counter); - uint32_t new_value = cur_value > nr_shared_credict ? cur_value - nr_shared_credict : 0; - - if (rte_atomic32_cmpset((volatile uint32_t *)desc->shared_credict_counter, cur_value, new_value)) - { - nr_shared_credict = cur_value > new_value ? cur_value - new_value : 0; - break; - } - } - - desc->shared_credict_used += nr_shared_credict; - assert(desc->shared_credict_used <= (desc->tunnel_size - desc->tunnel_exclusive_size)); - } - - assert((desc->tunnel_exclusive_size + desc->shared_credict_used) >= nr_ring_count); - - unsigned int n_can_send = (desc->tunnel_exclusive_size + desc->shared_credict_used) - nr_ring_count; - unsigned int n_to_send = RTE_MIN(desc->sz_en_buffer_used, n_can_send); + unsigned int n_to_send = desc->sz_en_buffer_used; size_t n_send_len = 0; for (unsigned int k = 0; k < desc->sz_en_buffer_used; k++) @@ -75,8 +52,7 @@ static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons } unsigned int n_free_space; - unsigned int n_send = - rte_ring_sp_enqueue_burst(desc->tunnel_object, (void **)desc->en_buffer, n_to_send, &n_free_space); + unsigned int n_send = rte_ring_sp_enqueue_burst(desc->tunnel_object, (void **)desc->en_buffer, n_to_send, &n_free_space); unsigned int n_send_missed = desc->sz_en_buffer_used - n_send; /* packet is missed */ @@ -86,18 +62,11 @@ static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons { struct rte_mbuf * object_to_be_free = desc->en_buffer[k]; n_send_len -= rte_pktmbuf_data_len(object_to_be_free); - rte_pktmbuf_free(object_to_be_free); - } - /* return the shared credict */ - unsigned int nr_shared_credict_to_release = RTE_MIN(n_send_missed, nr_shared_credict); - if (nr_shared_credict_to_release > 0) - { - rte_atomic32_add(desc->shared_credict_counter, (int32_t)nr_shared_credict_to_release); - desc->shared_credict_used -= nr_shared_credict_to_release; + /* move the mbuf to return buffer */ + assert(desc->sz_rt_buffer_used < desc->sz_rt_buffer); + desc->rt_buffer[desc->sz_rt_buffer_used++] = desc->en_buffer[k]; } - - assert(desc->shared_credict_used <= (desc->tunnel_size - desc->tunnel_exclusive_size)); } struct vnode_cons_notify * cons_notify_ctx = &cons->notify[consq]; @@ -161,22 +130,11 @@ static inline void dist_tunnel_enqueue(struct vnode_prod * prod, struct vnode_co out: rte_spinlock_unlock(&desc->lock_thread_safe_check); #endif - - return; } -static inline int dist_tunnel_dequeue(struct tunnel_desc * desc, void * obj, int nr_max_obj) +static inline int dist_tunnel_dequeue(struct tunnel_desc * desc, void * obj, unsigned int nr_max_obj) { unsigned int nr_deq = rte_ring_sc_dequeue_burst(desc->tunnel_object, obj, nr_max_obj, NULL); - unsigned int shared_credict_to_release = RTE_MIN(nr_deq, desc->shared_credict_used); - - if (shared_credict_to_release > 0) - { - rte_atomic32_add(desc->shared_credict_counter, (int32_t)shared_credict_to_release); - desc->shared_credict_used -= shared_credict_to_release; - } - - assert(desc->shared_credict_used <= (desc->tunnel_size - desc->tunnel_exclusive_size)); return (int)nr_deq; } @@ -193,14 +151,15 @@ static inline void dist_tunnel_block_flush(struct tunnel_block * block, int prod } } -static inline void dist_tunnel_block_enqueue_with_hash(struct tunnel_block * block, int prodq, struct rte_mbuf * obj[], - uint32_t hash[], int nr_obj) +static inline void dist_tunnel_block_enqueue_with_hash(struct tunnel_block * block, unsigned int prodq, struct rte_mbuf * obj[], + uint32_t hash[], unsigned int nr_obj) { assert(nr_obj <= MR_LIBVNODE_MAX_SZ_BURST); for (unsigned int i = 0; i < nr_obj; i++) { assert(obj[i] != NULL); unsigned int consq = hash[i] % block->nr_consq; + struct tunnel_desc * tunnel = *tunnel_block_locate(block, prodq, consq); dist_tunnel_enqueue(block->prod, block->cons, prodq, consq, tunnel, obj[i]); } @@ -208,56 +167,35 @@ static inline void dist_tunnel_block_enqueue_with_hash(struct tunnel_block * blo dist_tunnel_block_flush(block, prodq); } -#if 0 - -#define __FWDSTEP 4 - -static inline void dist_tunnel_block_enqueue_with_hash(struct tunnel_block * block, - int prodq, struct rte_mbuf * obj[], uint32_t hash[], int nr_obj) +static inline unsigned int dist_tunnel_block_rt_objects_retrieve(struct tunnel_block * block, unsigned int prodq, + struct rte_mbuf * rt_objs[], + unsigned int nr_max_rt_objs) { - struct tunnel_desc * tunnel; - unsigned int idx = 0; - unsigned int consq; + unsigned int nr_rt_objs = 0; + unsigned int nr_rt_objs_left = nr_max_rt_objs; - switch (nr_obj % 4) + for (unsigned int consq = 0; consq < block->nr_consq; consq++) { - case 0: - while (idx != nr_obj) { - consq = hash[idx] % block->nr_consq; - tunnel = *tunnel_block_locate(block, prodq, consq); - dist_tunnel_enqueue(block->prod, block->cons, prodq, consq, tunnel, obj[idx]); - idx++; - case 3: - consq = hash[idx] % block->nr_consq; - tunnel = *tunnel_block_locate(block, prodq, consq); - dist_tunnel_enqueue(block->prod, block->cons, prodq, consq, tunnel, obj[idx]); - idx++; - case 2: - consq = hash[idx] % block->nr_consq; - tunnel = *tunnel_block_locate(block, prodq, consq); - dist_tunnel_enqueue(block->prod, block->cons, prodq, consq, tunnel, obj[idx]); - idx++; - case 1: - consq = hash[idx] % block->nr_consq; - tunnel = *tunnel_block_locate(block, prodq, consq); - dist_tunnel_enqueue(block->prod, block->cons, prodq, consq, tunnel, obj[idx]); - idx++; - } + struct tunnel_desc * tunnel = *tunnel_block_locate(block, prodq, consq); + unsigned int nr_rt_objs_recv = dist_tunnel_rt_objects_retrieve(tunnel, &rt_objs[nr_rt_objs], nr_rt_objs_left); + nr_rt_objs += nr_rt_objs_recv; + nr_rt_objs_left -= nr_rt_objs_recv; } -} -#endif + assert(nr_rt_objs <= nr_max_rt_objs); + return nr_rt_objs; +} // Tunnel Block Dequeue, dequeue from block, only used by cons. // TODO: rewrite in SSE/SSE2/AVX/AVX2 intrinsics -static inline int dist_tunnel_block_dequeue(struct tunnel_block * block, int consq, struct rte_mbuf * obj[], - int nr_max_obj) +static inline unsigned int dist_tunnel_block_dequeue(struct tunnel_block * block, unsigned int consq, struct rte_mbuf * obj[], + unsigned int nr_max_obj) { unsigned int nr_obj = 0, nr_obj_recv = 0; unsigned int nr_obj_left = nr_max_obj; - for (int prodq = 0; prodq < block->nr_prodq; prodq++) + for (unsigned int prodq = 0; prodq < block->nr_prodq; prodq++) { struct tunnel_desc * tunnel = *tunnel_block_locate(block, prodq, consq); nr_obj_recv = dist_tunnel_dequeue(tunnel, &obj[nr_obj], nr_obj_left); @@ -269,29 +207,32 @@ static inline int dist_tunnel_block_dequeue(struct tunnel_block * block, int con return nr_obj; } +int vnode_mirror_rt_object_retrieve(struct vnode_prod * prod, unsigned int prodq, struct rte_mbuf * rt_objs[], + unsigned int nr_max_rt_objects) +{ + struct tunnel_block * block = ACCESS_ONCE(prod->block); + if (likely(block != NULL)) + { + return (int)dist_tunnel_block_rt_objects_retrieve(block, prodq, rt_objs, nr_max_rt_objects); + } + else + { + return 0; + } +} int vnode_mirror_enqueue_bulk(struct vnode_prod * prod, unsigned int prodq, struct rte_mbuf * objects[], - uint32_t hash[], int nr_objects) + uint32_t hash[], unsigned int nr_objects) { assert(nr_objects <= MR_LIBVNODE_MAX_SZ_BURST); struct tunnel_block * block = ACCESS_ONCE(prod->block); if (unlikely(block == NULL)) { - goto failure; + return -1; } - dist_tunnel_block_enqueue_with_hash(block, (int)prodq, objects, hash, nr_objects); - return 0; - -failure: - for (int i = 0; i < nr_objects; i++) - { - rte_pktmbuf_free(objects[i]); - } - - VNODE_STAT_UPDATE(prod, prodq, on_line, nr_objects); - VNODE_STAT_UPDATE(prod, prodq, missed, nr_objects); + dist_tunnel_block_enqueue_with_hash(block, prodq, objects, hash, nr_objects); return 0; } @@ -301,7 +242,7 @@ int vnode_mirror_dequeue_burst(struct vnode_cons * cons, unsigned int consq, str struct tunnel_block * block = ACCESS_ONCE(cons->block); if (likely(block != NULL)) { - return dist_tunnel_block_dequeue(block, consq, objects, nr_max_objects); + return (int)dist_tunnel_block_dequeue(block, consq, objects, nr_max_objects); } else { @@ -318,9 +259,9 @@ void vnode_mirror_flush(struct vnode_prod * prod, unsigned int prodq) } } -struct vnode * vnode_mirror_create(const char * sym, unsigned int sz_exclusive, unsigned int sz_shared, - unsigned int sz_buffer, unsigned int notify_cons_when_rx, - unsigned int batch_interval_us, unsigned int en_q_len_monitor) +struct vnode * vnode_mirror_create(const char * sym, unsigned int sz_exclusive, unsigned int sz_buffer, + unsigned int notify_cons_when_rx, unsigned int batch_interval_us, + unsigned int en_q_len_monitor) { struct vnode * vnode_common = __vnode_common_create(sym, sz_exclusive, sz_buffer, notify_cons_when_rx); @@ -331,10 +272,7 @@ struct vnode * vnode_mirror_create(const char * sym, unsigned int sz_exclusive, } vnode_common->batch_interval_tsc = batch_interval_us * rte_get_timer_cycles() / US_PER_S; - vnode_common->sz_shared = sz_shared; vnode_common->en_q_len_monitor = en_q_len_monitor; - - rte_atomic32_init(&vnode_common->shared_credict_counter); return vnode_common; } @@ -343,11 +281,6 @@ int vnode_mirror_delete(struct vnode * vnode) return __vnode_common_delete(vnode); } -void vnode_mirror_common_unpoison(struct vnode * vnode) -{ - __vnode_common_unpoison(vnode); -} - __USE_COMMON_VNODE_CREATE_PROD(mirror) __USE_COMMON_VNODE_CREATE_CONS(mirror) __USE_COMMON_VNODE_DELETE_PROD(mirror) diff --git a/infra/test/TestVNode.cc b/infra/test/TestVNode.cc index 6fb3dab..e6c8a50 100644 --- a/infra/test/TestVNode.cc +++ b/infra/test/TestVNode.cc @@ -39,7 +39,7 @@ class TestCaseVNodeQueue : public TestCaseVNode void SetUp() override { - vnode_ = vnode_mirror_create("m-vnode", 1024, 0, 32, 0, 0, 0); + vnode_ = vnode_mirror_create("m-vnode", 1024, 32, 0, 0, 0); ASSERT_NE(vnode_, nullptr); assert(prod_ == nullptr); @@ -82,7 +82,7 @@ struct rte_mempool * TestCaseVNode::pktmbuf_pool_ = nullptr; TEST_F(TestCaseVNode, CreateAndDeleteInEmptyNode) { - struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 1024, 0, 32, 0, 0, 0); + struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 1024, 32, 0, 0, 0); EXPECT_NE(vnode_ptr, nullptr); int ret = vnode_mirror_delete(vnode_ptr); @@ -91,7 +91,7 @@ TEST_F(TestCaseVNode, CreateAndDeleteInEmptyNode) TEST_F(TestCaseVNode, CreateAndDelete) { - struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 1024, 0, 32, 0, 0, 0); + struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 1024, 32, 0, 0, 0); ASSERT_NE(vnode_ptr, nullptr); struct vnode_prod * prod; @@ -109,7 +109,7 @@ TEST_F(TestCaseVNode, CreateAndDelete) TEST_F(TestCaseVNode, CreateAndDeleteMultiThread) { - struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 1024, 0, 32, 0, 0, 0); + struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 1024, 32, 0, 0, 0); ASSERT_NE(vnode_ptr, nullptr); /* create multiple thread and run them at same time */ @@ -141,7 +141,7 @@ TEST_F(TestCaseVNode, CreateAndDeleteMultiThread) TEST_F(TestCaseVNode, TestVNodeProdAndConsLookup) { - struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 1024, 0, 32, 0, 0, 0); + struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 1024, 32, 0, 0, 0); ASSERT_NE(vnode_ptr, nullptr); struct vnode_prod * prod; @@ -164,7 +164,7 @@ TEST_F(TestCaseVNode, TestVNodeProdAndConsLookup) TEST_F(TestCaseVNode, TestVNodeEnqueue) { - struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 1024, 0, 32, 0, 0, 0); + struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 1024, 32, 0, 0, 0); ASSERT_NE(vnode_ptr, nullptr); struct vnode_prod * prod; @@ -183,15 +183,15 @@ TEST_F(TestCaseVNode, TestVNodeEnqueue) int ret = rte_pktmbuf_alloc_bulk(pktmbuf_pool_, enq_objs, RTE_DIM(enq_objs)); ASSERT_EQ(ret, 0); - for (unsigned int i = 0; i < RTE_DIM(enq_objs); i++) + for (auto & enq_obj : enq_objs) { - enq_objs[i]->hash.usr = 0x4d5a; + enq_obj->hash.usr = 0x4d5a; } uint32_t enq_hashs[TEST_COUNT] = {}; - for (unsigned int i = 0; i < RTE_DIM(enq_hashs); i++) + for (unsigned int & enq_hash : enq_hashs) { - enq_hashs[i] = 0x4d5a; + enq_hash = 0x4d5a; } int enq_ret = vnode_mirror_enqueue_bulk(prod, 0, enq_objs, enq_hashs, RTE_DIM(enq_hashs)); @@ -209,10 +209,10 @@ TEST_F(TestCaseVNode, TestVNodeEnqueue) vnode_mirror_delete(vnode_ptr); } -TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueueUseSharedCredict) +TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue) { /* create multiple thread */ - struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 32, 2048, 32, 0, 0, 0); + struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 32, 32, 0, 0, 0); ASSERT_NE(vnode_ptr, nullptr); struct vnode_prod * prod; @@ -239,21 +239,45 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueueUseSharedCredict) std::thread test_thread_1([&prod, &enq_objs, &enq_hashs] { int enq_ret = vnode_mirror_enqueue_bulk(prod, 0, enq_objs, enq_hashs, 512); EXPECT_EQ(enq_ret, 0); + + struct rte_mbuf * rt_objects[TEST_MBUFS_COUNT]; + int rt_ret = vnode_mirror_rt_object_retrieve(prod, 0, rt_objects, RTE_DIM(rt_objects)); + EXPECT_EQ(rt_ret, 481); + + rte_pktmbuf_free_bulk(enq_objs, rt_ret); }); std::thread test_thread_2([&prod, &enq_objs, &enq_hashs] { int enq_ret = vnode_mirror_enqueue_bulk(prod, 1, enq_objs + 512, enq_hashs, 512); EXPECT_EQ(enq_ret, 0); + + struct rte_mbuf * rt_objects[TEST_MBUFS_COUNT]; + int rt_ret = vnode_mirror_rt_object_retrieve(prod, 1, rt_objects, RTE_DIM(rt_objects)); + EXPECT_EQ(rt_ret, 481); + + rte_pktmbuf_free_bulk(enq_objs, rt_ret); }); std::thread test_thread_3([&prod, &enq_objs, &enq_hashs] { int enq_ret = vnode_mirror_enqueue_bulk(prod, 2, enq_objs + 1024, enq_hashs, 512); EXPECT_EQ(enq_ret, 0); + + struct rte_mbuf * rt_objects[TEST_MBUFS_COUNT]; + int rt_ret = vnode_mirror_rt_object_retrieve(prod, 2, rt_objects, RTE_DIM(rt_objects)); + EXPECT_EQ(rt_ret, 481); + + rte_pktmbuf_free_bulk(enq_objs, rt_ret); }); std::thread test_thread_4([&prod, &enq_objs, &enq_hashs] { int enq_ret = vnode_mirror_enqueue_bulk(prod, 3, enq_objs + 1536, enq_hashs, 512); EXPECT_EQ(enq_ret, 0); + + struct rte_mbuf * rt_objects[TEST_MBUFS_COUNT]; + int rt_ret = vnode_mirror_rt_object_retrieve(prod, 3, rt_objects, RTE_DIM(rt_objects)); + EXPECT_EQ(rt_ret, 481); + + rte_pktmbuf_free_bulk(enq_objs, rt_ret); }); test_thread_1.join(); @@ -275,25 +299,26 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueueUseSharedCredict) } EXPECT_EQ(prod_on_line_total, 2048); - EXPECT_EQ(prod_deliver_total, 2048); - EXPECT_EQ(prod_missed_total, 0); + EXPECT_EQ(prod_deliver_total, 124); + EXPECT_EQ(prod_missed_total, 1924); /* on cons side */ struct vnode_cons_stat * cons_stat = vnode_mirror_cons_stat_get(cons); EXPECT_EQ(cons_stat[0].on_line, 2048); - EXPECT_EQ(cons_stat[0].deliver, 2048); - EXPECT_EQ(cons_stat[0].missed, 0); + EXPECT_EQ(cons_stat[0].deliver, 124); + EXPECT_EQ(cons_stat[0].missed, 1924); - int deq_ret = vnode_mirror_dequeue_burst(cons, 0, enq_objs, RTE_DIM(enq_objs)); - EXPECT_EQ(deq_ret, 2048); + struct rte_mbuf * deq_objs[TEST_MBUFS_COUNT]; + int deq_ret = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs)); + EXPECT_EQ(deq_ret, 124); - rte_pktmbuf_free_bulk(enq_objs, deq_ret); + rte_pktmbuf_free_bulk(deq_objs, deq_ret); vnode_mirror_delete(vnode_ptr); } TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict) { - struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 32, 512, 32, 0, 0, 0); + struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 32, 32, 0, 0, 0); ASSERT_NE(vnode_ptr, nullptr); struct vnode_prod * prod; @@ -314,57 +339,76 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict) uint32_t enq_hashs[TEST_MBUFS_COUNT] = {}; - for (unsigned int i = 0; i < RTE_DIM(enq_hashs); i++) + for (unsigned int & enq_hash : enq_hashs) { - enq_hashs[i] = 0x4d5a; + enq_hash = 0x4d5a; } /* first 512 mbufs, use the exclusive credit */ int enq_ret = vnode_mirror_enqueue_bulk(prod, 0, enq_objs, enq_hashs, 512); EXPECT_EQ(enq_ret, 0); - /* second 512 mbufs, use the shared credict */ + int rt_ret = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, 512); + EXPECT_EQ(rt_ret, 481); + rte_pktmbuf_free_bulk(deq_objs, rt_ret); + + /* at here, the ring is full, no object can be enqueue */ int enq_ret_2 = vnode_mirror_enqueue_bulk(prod, 0, enq_objs + 512, enq_hashs, 512); EXPECT_EQ(enq_ret_2, 0); - /* until here, we have 512 + 32 credict, so only 544 mbufs can be enqueue. */ + int rt_ret_2 = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, 512); + EXPECT_EQ(rt_ret_2, 512); + rte_pktmbuf_free_bulk(deq_objs, rt_ret_2); + + /* until here, we have 31 objects enqueue, so only 544 mbufs can be enqueue. */ int deq_ret_1 = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs)); - EXPECT_EQ(deq_ret_1, 544); + EXPECT_EQ(deq_ret_1, 31); struct vnode_prod_stat * prod_stat = vnode_mirror_prod_stat_get(prod); EXPECT_EQ(prod_stat->on_line, 1024); - EXPECT_EQ(prod_stat->deliver, 544); - EXPECT_EQ(prod_stat->missed, 480); + EXPECT_EQ(prod_stat->deliver, 31); + EXPECT_EQ(prod_stat->missed, 993); struct vnode_cons_stat * cons_stat = vnode_mirror_cons_stat_get(cons); EXPECT_EQ(cons_stat->on_line, 1024); - EXPECT_EQ(cons_stat->deliver, 544); - EXPECT_EQ(cons_stat->missed, 480); + EXPECT_EQ(cons_stat->deliver, 31); + EXPECT_EQ(cons_stat->missed, 993); /* free these mbufs */ rte_pktmbuf_free_bulk(deq_objs, deq_ret_1); + /*****************************************************************************************/ + /* put another 512 mbufs */ int enq_ret_3 = vnode_mirror_enqueue_bulk(prod, 0, enq_objs + 1024, enq_hashs, 512); EXPECT_EQ(enq_ret_3, 0); + /* get the packet needs to be free */ + int rt_ret_3 = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, 512); + EXPECT_EQ(rt_ret_3, 481); + rte_pktmbuf_free_bulk(deq_objs, rt_ret_3); + int enq_ret_4 = vnode_mirror_enqueue_bulk(prod, 0, enq_objs + 1536, enq_hashs, 512); EXPECT_EQ(enq_ret_4, 0); + int rt_ret_4 = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, 512); + EXPECT_EQ(rt_ret_4, 512); + rte_pktmbuf_free_bulk(deq_objs, rt_ret_4); + int deq_ret_2 = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs)); - EXPECT_EQ(deq_ret_2, 544); + EXPECT_EQ(deq_ret_2, 31); /* another round, the stat should be double */ prod_stat = vnode_mirror_prod_stat_get(prod); cons_stat = vnode_mirror_cons_stat_get(cons); EXPECT_EQ(prod_stat->on_line, 2048); - EXPECT_EQ(prod_stat->deliver, 1088); - EXPECT_EQ(prod_stat->missed, 960); + EXPECT_EQ(prod_stat->deliver, 62); + EXPECT_EQ(prod_stat->missed, 1986); EXPECT_EQ(cons_stat->on_line, 2048); - EXPECT_EQ(cons_stat->deliver, 1088); - EXPECT_EQ(cons_stat->missed, 960); + EXPECT_EQ(cons_stat->deliver, 62); + EXPECT_EQ(cons_stat->missed, 1986); rte_pktmbuf_free_bulk(deq_objs, deq_ret_2); vnode_mirror_delete(vnode_ptr); @@ -372,7 +416,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict) TEST_F(TestCaseVNode, TestVNodeMultipleQueueUseSharedCredict) { - struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 32, 512, 32, 0, 0, 0); + struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 32, 32, 0, 0, 0); ASSERT_NE(vnode_ptr, nullptr); struct vnode_prod * prod; @@ -407,9 +451,9 @@ TEST_F(TestCaseVNode, TestVNodeMultipleQueueUseSharedCredict) /* enqueue */ uint32_t enq_hashs[32] = {}; - for (unsigned int i = 0; i < RTE_DIM(enq_hashs); i++) + for (unsigned int & enq_hash : enq_hashs) { - enq_hashs[i] = 0x4d5a; + enq_hash = 0x4d5a; } /* first 32 mbufs, use the exclusive credit */ @@ -420,26 +464,37 @@ TEST_F(TestCaseVNode, TestVNodeMultipleQueueUseSharedCredict) int enq_ret_2 = vnode_mirror_enqueue_bulk(prod, 0, enq_objs_q1_shared, enq_hashs, 32); EXPECT_EQ(enq_ret_2, 0); - /* third 32 mbufs, use the exclusive credit */ + /* retrieve the drop packets */ + struct rte_mbuf * rt_objs[128] = {}; + int rt_ret = vnode_mirror_rt_object_retrieve(prod, 0, rt_objs, 128); + EXPECT_EQ(rt_ret, 33); + rte_pktmbuf_free_bulk(rt_objs, rt_ret); + + /* third 32 mbufs, another queue, 31 packet should be enqueue. */ int enq_ret_3 = vnode_mirror_enqueue_bulk(prod, 1, enq_objs_q2_exclusive, enq_hashs, 32); EXPECT_EQ(enq_ret_3, 0); - /* fourth 32 mbufs, use the shared credict */ + /* fourth, 32 mbufs should be dropped */ int enq_ret_4 = vnode_mirror_enqueue_bulk(prod, 1, enq_objs_q2_shared, enq_hashs, 32); EXPECT_EQ(enq_ret_4, 0); + /* retrieve the drop packets */ + rt_ret = vnode_mirror_rt_object_retrieve(prod, 1, rt_objs, 128); + EXPECT_EQ(rt_ret, 33); + rte_pktmbuf_free_bulk(rt_objs, rt_ret); + /* dequeue */ struct rte_mbuf * deq_objs[128] = {}; int deq_ret = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs)); - EXPECT_EQ(deq_ret, 128); + EXPECT_EQ(deq_ret, 62); rte_pktmbuf_free_bulk(deq_objs, deq_ret); vnode_mirror_delete(vnode_ptr); } -TEST_F(TestCaseVNode, TestVNodeEnqueueUseSharedCredict) +TEST_F(TestCaseVNode, TestVNodeEnqueueMultipleQueue) { - struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 512, 512, 32, 0, 0, 0); + struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 512, 32, 0, 0, 0); ASSERT_NE(vnode_ptr, nullptr); struct vnode_prod * prod; @@ -458,34 +513,52 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueUseSharedCredict) int ret = rte_pktmbuf_alloc_bulk(pktmbuf_pool_, enq_objs, RTE_DIM(enq_objs)); ASSERT_EQ(ret, 0); - for (unsigned int i = 0; i < RTE_DIM(enq_objs); i++) + for (auto & enq_obj : enq_objs) { - enq_objs[i]->hash.usr = 0x4d5a; + enq_obj->hash.usr = 0x4d5a; } uint32_t enq_hashs[TEST_MBUFS_COUNT] = {}; - for (unsigned int i = 0; i < RTE_DIM(enq_hashs); i++) + for (unsigned int & enq_hash : enq_hashs) { - enq_hashs[i] = 0x4d5a; + enq_hash = 0x4d5a; } /* first 512 mbufs, use the exclusive credit */ int enq_ret = vnode_mirror_enqueue_bulk(prod, 0, enq_objs, enq_hashs, 512); EXPECT_EQ(enq_ret, 0); - /* second 512 mbufs, use the shared credict */ + int rt_ret = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, 512); + EXPECT_EQ(rt_ret, 1); + rte_pktmbuf_free_bulk(deq_objs, rt_ret); + + /* second 512 mbufs should be rejected */ int enq_ret_2 = vnode_mirror_enqueue_bulk(prod, 0, enq_objs + 512, enq_hashs, 512); EXPECT_EQ(enq_ret_2, 0); + int rt_ret_2 = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, 512); + EXPECT_EQ(rt_ret_2, 512); + rte_pktmbuf_free_bulk(deq_objs, 512); + /* third 512 mbufs should be rejected */ int enq_ret_3 = vnode_mirror_enqueue_bulk(prod, 0, enq_objs + 1024, enq_hashs, 512); EXPECT_EQ(enq_ret_3, 0); + int rt_ret_3 = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, 512); + EXPECT_EQ(rt_ret_3, 512); + + /* free the rejected packets */ + rte_pktmbuf_free_bulk(deq_objs, 512); + int enq_ret_4 = vnode_mirror_enqueue_bulk(prod, 0, enq_objs + 1536, enq_hashs, 512); EXPECT_EQ(enq_ret_4, 0); + int rt_ret_4 = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, 512); + EXPECT_EQ(rt_ret_4, 512); + rte_pktmbuf_free_bulk(deq_objs, 512); + int deq_ret = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs)); - EXPECT_EQ(deq_ret, 1024); + EXPECT_EQ(deq_ret, 511); rte_pktmbuf_free_bulk(deq_objs, deq_ret); vnode_mirror_delete(vnode_ptr); @@ -493,7 +566,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueUseSharedCredict) TEST_F(TestCaseVNodeQueue, MultQueueEnqueue) { - struct vnode * vnode_ptr = vnode_mirror_create("vnode", 1024, 0, 32, 0, 0, 0); + struct vnode * vnode_ptr = vnode_mirror_create("vnode", 1024, 32, 0, 0, 0); ASSERT_NE(vnode_ptr, nullptr); struct vnode_prod * prod; @@ -512,15 +585,15 @@ TEST_F(TestCaseVNodeQueue, MultQueueEnqueue) int ret = rte_pktmbuf_alloc_bulk(pktmbuf_pool_, enq_objs, RTE_DIM(enq_objs)); ASSERT_EQ(ret, 0); - for (unsigned int i = 0; i < RTE_DIM(enq_objs); i++) + for (auto & enq_obj : enq_objs) { - enq_objs[i]->hash.usr = 0x4d5a; + enq_obj->hash.usr = 0x4d5a; } uint32_t enq_hashs[TEST_COUNT] = {}; - for (unsigned int i = 0; i < RTE_DIM(enq_hashs); i++) + for (unsigned int & enq_hash : enq_hashs) { - enq_hashs[i] = 0x4d5a; + enq_hash = 0x4d5a; } for (unsigned int i = 0; i < RTE_DIM(enq_hashs); i++) @@ -539,8 +612,8 @@ TEST_F(TestCaseVNodeQueue, MultQueueEnqueue) EXPECT_EQ(cons_stat[0].on_line, 32); EXPECT_EQ(cons_stat[0].deliver, 32); EXPECT_EQ(cons_stat[0].missed, 0); - EXPECT_EQ(cons_stat[0].q_len_max, 32); - EXPECT_EQ(cons_stat[0].q_len_avg_max, 32); + EXPECT_EQ(cons_stat[0].q_len_max, 5); + EXPECT_LE(cons_stat[0].q_len_avg_max, 5); int deq_ret = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs)); EXPECT_EQ(deq_ret, 32); @@ -554,9 +627,7 @@ TEST_F(TestCaseVNodeQueue, MultQueueEnqueue) EXPECT_EQ(cons_stat[0].on_line, 32); EXPECT_EQ(cons_stat[0].deliver, 32); EXPECT_EQ(cons_stat[0].missed, 0); - - /* after dequeue, q_len should be zero */ - EXPECT_EQ(cons_stat[0].q_len_max, 32); + EXPECT_EQ(cons_stat[0].q_len_max, 5); EXPECT_LE(cons_stat[0].q_len_avg_max, 32); rte_pktmbuf_free_bulk(deq_objs, deq_ret); diff --git a/service/include/sc_vdev.h b/service/include/sc_vdev.h index 6314ffd..b6fad5e 100644 --- a/service/include/sc_vdev.h +++ b/service/include/sc_vdev.h @@ -65,26 +65,6 @@ struct _vdev * flush : flush packet buffer */ -#if 0 - int (*dispatch)(struct _vdev * _vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts, - struct rte_mbuf * overload_pkts[], unsigned int * nr_overload_pkts, int flags); -#endif - - int (*dispatch)(struct _vdev * _vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts, int flags); - int (*collect)(struct _vdev * _vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts, int flags); - int (*idle_pool)(struct _vdev * _vdev, queue_id_t qid); - int (*stats_get)(struct _vdev * _vdev, struct vdev_stat_info * stat_info); - - /* 销毁函数 */ - int (*destory)(struct _vdev * _vdev); - - /* VDI创建 */ - struct vdev_instance * ( - *vdi_create)(struct _vdev * _vdev, const char * appsym, unsigned int nr_rxstream, unsigned int nr_txstream); - - /* VDI销毁 */ - int (*vdi_destory)(struct vdev_instance * vdi); - /* 统计信息暂存,通过外部接口设置,用于计算速度 */ struct vdev_stat_info stat_info_last; }; @@ -122,6 +102,8 @@ int vdev_dispatch(struct vdev * vdev, queue_id_t qid, struct rte_mbuf * pkts[], // 收集数据 int vdev_collect(struct vdev * vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts, int flags); +int vdev_rt_pkts_retrieve(struct vdev * vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts); + // 空闲轮询 int vdev_idle_poll(struct vdev * vdev, queue_id_t qid); @@ -136,8 +118,22 @@ int vdev_data_create(struct vdev_main * v_main, const char * symbol, unsigned in unsigned int sz_buffer, unsigned int batch_interval_in_us, unsigned int en_q_len_monitor, struct rte_mempool * direct_pool); -int vdev_loop_create(struct vdev_main * v_main, const char * symbol, unsigned int sz_tunnel, unsigned int sz_buffer, - struct rte_mempool * direct_pool, struct rte_mempool * indirect_pool); +int vdev_data_dispatch(struct _vdev * _vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts, + int flags); + +int vdev_data_collect(struct _vdev * _vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts, + int flags); + +int vdev_data_idle_poll(struct _vdev * _vdev, queue_id_t qid); + +int vdev_data_stats_get(struct _vdev * _vdev, struct vdev_stat_info * stat_info); + +struct vdev_instance * vdev_data_instance_create(struct _vdev * _vdev, const char * appsym, + unsigned int nr_rxstream, unsigned int nr_txstream); + +int vdev_data_rt_pkts_retrieve(struct _vdev * _vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts); + +int vdev_data_instance_destory(struct vdev_instance * vdi); int vdev_main_init(struct sc_main * sc); diff --git a/service/src/node_shmdev.c b/service/src/node_shmdev.c index 4aa6506..de700bf 100644 --- a/service/src/node_shmdev.c +++ b/service/src/node_shmdev.c @@ -131,16 +131,20 @@ uint16_t shmdev_tx_node_process(struct rte_graph * graph, struct rte_node * node while (nr_pkts) { - if (nr_pkts > RTE_GRAPH_BURST_SIZE) - { - vdev_dispatch(shm_dev_desc, graph->id, mbufs, RTE_GRAPH_BURST_SIZE, 0); - nr_pkts -= RTE_GRAPH_BURST_SIZE; - mbufs += RTE_GRAPH_BURST_SIZE; - } - else + unsigned int nr_mbufs_this_batch = (nr_pkts > RTE_GRAPH_BURST_SIZE) ? RTE_GRAPH_BURST_SIZE : nr_pkts; + vdev_dispatch(shm_dev_desc, graph->id, mbufs, nr_mbufs_this_batch, 0); + + nr_pkts -= nr_mbufs_this_batch; + mbufs += nr_mbufs_this_batch; + + /* retrieve the backpressure packets */ + struct rte_mbuf * rt_mbufs[RTE_GRAPH_BURST_SIZE]; + int ret = vdev_rt_pkts_retrieve(shm_dev_desc, graph->id, rt_mbufs, RTE_GRAPH_BURST_SIZE); + + /* these packet to pkt drop node */ + if (unlikely(ret > 0)) { - vdev_dispatch(shm_dev_desc, graph->id, mbufs, nr_pkts, 0); - nr_pkts = 0; + rte_node_enqueue(graph, node, 0, (void **)rt_mbufs, ret); } } diff --git a/service/src/vdata.c b/service/src/vdata.c index 91d74d2..e87b52c 100644 --- a/service/src/vdata.c +++ b/service/src/vdata.c @@ -13,7 +13,7 @@ #include <sc_common.h> #include <sc_vdev.h> -static int vdev_data_dispatch(struct _vdev * _vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts, +int vdev_data_dispatch(struct _vdev * _vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts, int flags) { hash_t hash_value[MR_BURST_MAX]; @@ -32,7 +32,7 @@ static int vdev_data_dispatch(struct _vdev * _vdev, queue_id_t qid, struct rte_m return vnode_mirror_enqueue_bulk(_vdev->vnode_rx_prod, qid, pkts, hash_value, nr_pkts); } -static int vdev_data_collect(struct _vdev * _vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts, +int vdev_data_collect(struct _vdev * _vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts, int flags) { int __nr_mbufs_out = 0; @@ -61,13 +61,18 @@ static int vdev_data_collect(struct _vdev * _vdev, queue_id_t qid, struct rte_mb return __nr_mbufs_out; } -static int vdev_data_idle_poll(struct _vdev * _vdev, queue_id_t qid) +int vdev_data_rt_pkts_retrieve(struct _vdev * _vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts) +{ + return vnode_mirror_rt_object_retrieve(_vdev->vnode_rx_prod, qid, pkts, nr_pkts); +} + +int vdev_data_idle_poll(struct _vdev * _vdev, queue_id_t qid) { vnode_mirror_flush(_vdev->vnode_rx_prod, qid); return 0; } -static int vdev_data_stats_get(struct _vdev * _vdev, struct vdev_stat_info * stat_info) +int vdev_data_stats_get(struct _vdev * _vdev, struct vdev_stat_info * stat_info) { struct vnode_prod_stat * st_prod_rx = vnode_mirror_prod_stat_get(_vdev->vnode_rx_prod); struct vnode_cons_stat * st_cons_tx = vnode_mirror_cons_stat_get(_vdev->vnode_tx_cons); @@ -80,13 +85,6 @@ static int vdev_data_stats_get(struct _vdev * _vdev, struct vdev_stat_info * sta stat_info->rx_deliver[i] = VNODE_STAT_READ(&st_prod_rx[i].deliver); stat_info->rx_missed[i] = VNODE_STAT_READ(&st_prod_rx[i].missed); stat_info->rx_total_len[i] = VNODE_STAT_READ(&st_prod_rx[i].total_len); - -#if 0 - stat_info->notify_state_running[i] = rte_atomic64_read(&st_prod_rx[i].notify_state_running); - stat_info->notify_state_ready[i] = rte_atomic64_read(&st_prod_rx[i].notify_state_ready); - stat_info->notify_state_waiting[i] = rte_atomic64_read(&st_prod_rx[i].notify_state_waiting); -#endif - stat_info->batch_size_count[i] = VNODE_STAT_READ(&st_prod_rx[i].batch_size_count); stat_info->batch_size_total[i] = VNODE_STAT_READ(&st_prod_rx[i].batch_size_total); } @@ -107,7 +105,7 @@ static int vdev_data_stats_get(struct _vdev * _vdev, struct vdev_stat_info * sta return 0; } -static int vdev_data_instance_destory(struct vdev_instance * vdi) +int vdev_data_instance_destory(struct vdev_instance * vdi) { if (vdi->vnode_rx_cons != NULL) vnode_mirror_delete_cons(vdi->vnode_rx_cons); @@ -121,7 +119,7 @@ static int vdev_data_instance_destory(struct vdev_instance * vdi) } /* 创建虚设备实例,应用使用 */ -static struct vdev_instance * vdev_data_instance_create(struct _vdev * _vdev, const char * appsym, +struct vdev_instance * vdev_data_instance_create(struct _vdev * _vdev, const char * appsym, unsigned int nr_rxstream, unsigned int nr_txstream) { struct vdev_instance * vdi = ZMALLOC(sizeof(struct vdev_instance)); @@ -227,12 +225,11 @@ int vdev_data_create(struct vdev_main * v_main, const char * symbol, unsigned in snprintf(vnode_sym_ltx, sizeof(vnode_sym_ltx), "%s-ltx", vdev_info->symbol); /* 创建VNODE */ - _vdev->vnode_rx = vnode_mirror_create(vnode_sym_rx, sz_tunnel_rx_exclusive, sz_tunnel_rx_shared, sz_buffer, 1, - batch_interval_in_us, en_q_len_monitor); - - _vdev->vnode_tx = vnode_mirror_create(vnode_sym_tx, sz_tunnel_tx, 0, sz_buffer, 0, 0, 0); - _vdev->vnode_ftx = vnode_mirror_create(vnode_sym_ftx, sz_tunnel_tx, 0, 0, 0, 0, 0); - _vdev->vnode_ltx = vnode_mirror_create(vnode_sym_ltx, sz_tunnel_tx, 0, 0, 0, 0, 0); + _vdev->vnode_rx = + vnode_mirror_create(vnode_sym_rx, sz_tunnel_rx_exclusive, sz_buffer, 1, batch_interval_in_us, en_q_len_monitor); + _vdev->vnode_tx = vnode_mirror_create(vnode_sym_tx, sz_tunnel_tx, sz_buffer, 0, 0, 0); + _vdev->vnode_ftx = vnode_mirror_create(vnode_sym_ftx, sz_tunnel_tx, 0, 0, 0, 0); + _vdev->vnode_ltx = vnode_mirror_create(vnode_sym_ltx, sz_tunnel_tx, 0, 0, 0, 0); #define ERR_VERIFY(x, ...) \ do \ @@ -259,15 +256,6 @@ int vdev_data_create(struct vdev_main * v_main, const char * symbol, unsigned in ERR_VERIFY(_vdev->vnode_tx_cons, "Create vdev %s tx vnode consumer failed. ", vdev_info->symbol); ERR_VERIFY(_vdev->vnode_ftx_cons, "Create vdev %s fast tx vnode consumer failed. ", vdev_info->symbol); - /* 注册回调函数 */ - _vdev->dispatch = vdev_data_dispatch; - _vdev->collect = vdev_data_collect; - _vdev->idle_pool = vdev_data_idle_poll; - _vdev->destory = vdev_data_destory; - _vdev->stats_get = vdev_data_stats_get; - _vdev->vdi_create = vdev_data_instance_create; - _vdev->vdi_destory = vdev_data_instance_destory; - /* 加入虚设备列表 */ MR_VERIFY(v_main->vdev_max_idx <= RTE_DIM(v_main->_vdev_array)); vdev_info->port_id = v_main->vdev_max_idx; @@ -277,7 +265,10 @@ int vdev_data_create(struct vdev_main * v_main, const char * symbol, unsigned in errout: if (_vdev != NULL) + { vdev_data_destory(_vdev); + } + rte_free(_vdev); return RT_ERR; diff --git a/service/src/vdev.c b/service/src/vdev.c index 491f0f7..7ce3eff 100644 --- a/service/src/vdev.c +++ b/service/src/vdev.c @@ -62,27 +62,33 @@ int vdev_dispatch(struct vdev * vdev, queue_id_t qid, struct rte_mbuf * pkts[], { struct _vdev * _vdev = container_of(vdev, struct _vdev, vdev); mr_pdump_tx(vdev->port_id, qid, pkts, nr_pkts); - return _vdev->dispatch(_vdev, qid, pkts, nr_pkts, flags); + return vdev_data_dispatch(_vdev, qid, pkts, nr_pkts, flags); } int vdev_collect(struct vdev * vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts, int flags) { struct _vdev * _vdev = container_of(vdev, struct _vdev, vdev); - unsigned int rx_nr_mbufs = _vdev->collect(_vdev, qid, pkts, nr_pkts, flags); + unsigned int rx_nr_mbufs = vdev_data_collect(_vdev, qid, pkts, nr_pkts, flags); mr_pdump_rx(vdev->port_id, qid, pkts, rx_nr_mbufs); - return rx_nr_mbufs; + return (int)rx_nr_mbufs; +} + +int vdev_rt_pkts_retrieve(struct vdev * vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts) +{ + struct _vdev * _vdev = container_of(vdev, struct _vdev, vdev); + return vdev_data_rt_pkts_retrieve(_vdev, qid, pkts, nr_pkts); } int vdev_idle_poll(struct vdev * vdev, queue_id_t qid) { struct _vdev * _vdev = container_of(vdev, struct _vdev, vdev); - return _vdev->idle_pool(_vdev, qid); + return vdev_data_idle_poll(_vdev, qid); } int vdev_stats_get(struct vdev * vdev, struct vdev_stat_info * stat_info) { struct _vdev * _vdev = container_of(vdev, struct _vdev, vdev); - return _vdev->stats_get(_vdev, stat_info); + return vdev_data_stats_get(_vdev, stat_info); } void vdev_stats_last_save(struct vdev * vdev, struct vdev_stat_info * stat_info_last) @@ -136,10 +142,8 @@ static int vdev_instance_create_handler(const struct rte_mp_msg * msg, const voi } struct _vdev * _vdev = container_of(vdev, struct _vdev, vdev); - - /* 申请VDI */ struct vdev_instance * vdi = - _vdev->vdi_create(_vdev, app_object->symbol, msg_req->nr_rxstream, msg_req->nr_txstream); + vdev_data_instance_create(_vdev, app_object->symbol, msg_req->nr_rxstream, msg_req->nr_txstream); if (vdi == NULL) { @@ -210,9 +214,7 @@ static void shmdev_event_handler_app_unregister(struct app_main * app_main, stru /* Mark the vdev as not in use */ vdi->vdev->in_use = VDEV_NOT_IN_USE; - - struct _vdev * _vdev = container_of(vdi->vdev, struct _vdev, vdev); - _vdev->vdi_destory(vdi); + vdev_data_instance_destory(vdi); } FREE(vdev_main_pme); |
