summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author陆秋文 <[email protected]>2024-04-01 07:59:57 +0000
committer宋延超 <[email protected]>2024-04-01 07:59:57 +0000
commit3a51336967fd8027e9ae4000bd81ce2443e9ba87 (patch)
treeab32109085ee4ad68597f9f0ebb7bf7074c385da
parentfdb63a8888c061cf6fd9887d715c487938b92703 (diff)
Feature retrieve object of backpressure
-rw-r--r--app/CMakeLists.txt2
-rw-r--r--app/include/mrapp.h6
-rw-r--r--app/include/mrdp_trace.h1
-rw-r--r--app/include/neigh.h53
-rw-r--r--app/src/arp.c204
-rw-r--r--app/src/icmp.c98
-rw-r--r--app/src/monit.c9
-rw-r--r--app/src/mrb.c35
-rw-r--r--app/src/neigh.c317
-rw-r--r--app/src/rawio.c69
-rw-r--r--app/src/sendpath.c238
-rw-r--r--app/src/tap.c6
-rw-r--r--include/external/marsio.h2
-rw-r--r--infra/include/vnode.h19
-rw-r--r--infra/src/vnode_common.c48
-rw-r--r--infra/src/vnode_common.h25
-rw-r--r--infra/src/vnode_mirror.c185
-rw-r--r--infra/test/TestVNode.cc187
-rw-r--r--service/include/sc_vdev.h40
-rw-r--r--service/src/node_shmdev.c22
-rw-r--r--service/src/vdata.c47
-rw-r--r--service/src/vdev.c24
22 files changed, 345 insertions, 1292 deletions
diff --git a/app/CMakeLists.txt b/app/CMakeLists.txt
index b891b50..3941080 100644
--- a/app/CMakeLists.txt
+++ b/app/CMakeLists.txt
@@ -5,7 +5,7 @@ include_directories(${CMAKE_SOURCE_DIR}/include/internal)
include_directories(include)
include_directories(../service/include)
-add_library(marsio SHARED src/marsio.c src/arp.c src/icmp.c src/neigh.c src/rawio.c src/mrb.c
+add_library(marsio SHARED src/marsio.c src/rawio.c src/mrb.c
src/sendpath.c src/monit.c src/tap.c src/dp_trace.c)
set_target_properties(marsio PROPERTIES VERSION ${MARSIO_VERSION_MAJOR}.${MARSIO_VERSION_MINOR})
diff --git a/app/include/mrapp.h b/app/include/mrapp.h
index 1ddccbc..0ea11bb 100644
--- a/app/include/mrapp.h
+++ b/app/include/mrapp.h
@@ -5,7 +5,6 @@
#include <ldbc.h>
#include <marsio.h>
#include <mr_rte_msg.h>
-#include <neigh.h>
#include <pcap/pcap.h>
#include <rte_epoll.h>
#include <tap.h>
@@ -117,8 +116,6 @@ struct mr_instance
struct mr_vdev vdevs[MR_VDEV_MAX];
/* 虚设备实例数量 */
unsigned int nr_vdevs;
- /* 邻居管理器 */
- struct neighbour_manager * neigh;
/* 负载均衡器 */
struct distributer * dist_object;
/* 静态邻居表 */
@@ -188,4 +185,5 @@ int mrapp_monit_loop(struct mr_instance * instance);
int mrapp_packet_send_burst(struct vdev_instance * vdi, queue_id_t qid, struct rte_mbuf * mbufs[], int nr_mbufs);
-int mrapp_packet_fast_send_burst(struct vdev_instance * vdi, queue_id_t qid, struct rte_mbuf * mbufs[], int nr_mbufs);
+int mrapp_packet_fast_send_burst(struct mr_instance * instance,
+ struct vdev_instance * vdi, queue_id_t qid, struct rte_mbuf * mbufs[], int nr_mbufs);
diff --git a/app/include/mrdp_trace.h b/app/include/mrdp_trace.h
index 7bcfef5..71cafd4 100644
--- a/app/include/mrdp_trace.h
+++ b/app/include/mrdp_trace.h
@@ -1,6 +1,7 @@
#pragma once
#include "marsio.h"
+#include <cJSON.h>
int marsio_dp_trace_init(struct mr_instance * instance);
void marsio_dp_trace_record_write(struct mr_instance * instance, marsio_buff_t * mbuf);
diff --git a/app/include/neigh.h b/app/include/neigh.h
deleted file mode 100644
index 18aeae5..0000000
--- a/app/include/neigh.h
+++ /dev/null
@@ -1,53 +0,0 @@
-#pragma once
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-#include <rte_atomic.h>
-#include <rte_rwlock.h>
-#include <vdev_define.h>
-#include <cJSON.h>
-#include <MESA_htable.h>
-
-struct neighbour;
-TAILQ_HEAD(neighbour_list, neighbour);
-
-struct neighbour_manager
-{
- /* 读写锁 */
- rte_spinlock_t lock;
- /* 查询哈希表 */
- MESA_htable_handle table;
- /* 待查询表项队列 */
- struct neighbour_list in_none_list;
- /* 待查询表项队列长度(不受rwlock的保护) */
- rte_atomic16_t in_none_list_len;
- /* 最大表项数量 */
- unsigned int max_entries;
- /* 最大待查询表项数 */
- unsigned int max_queue_entries;
- /* 表项老化时间 */
- unsigned short t_timeout;
- /* ARP发送频率 */
- unsigned short t_arp_send;
-};
-
-int neighbour_mamanger_init(struct neighbour_manager * object,
- const char * symbol, unsigned int max_entries, unsigned short t_timeout, unsigned int t_arp_send);
-
-int neighbour_mamanger_deinit(struct neighbour_manager * object);
-
-int neigh_create_or_update(struct neighbour_manager * object, struct in_addr in_addr,
- struct rte_ether_addr * eth_addr, struct vdev_instance * vdi, unsigned short en_permanent);
-
-int neigh_delete(struct neighbour_manager * object, struct in_addr in_addr);
-
-int neigh_query(struct neighbour_manager * object, struct in_addr in_addr,
- struct rte_ether_addr * out_ether_addr, struct vdev_instance ** vdi);
-
-cJSON * neighbour_manager_monit(struct neighbour_manager * object);
-
-#ifdef __cplusplus
-}
-#endif \ No newline at end of file
diff --git a/app/src/arp.c b/app/src/arp.c
deleted file mode 100644
index 538bdca..0000000
--- a/app/src/arp.c
+++ /dev/null
@@ -1,204 +0,0 @@
-/* \brief 简单协议栈ARP/RARP协议处理模块
- *
- * 处理ARP/RARP协议报文数据
- *
- * \author Lu Qiuwen<[email protected]>
- * \date 2016-10-21
- */
-
-#include <arp.h>
-#include <common.h>
-#include <mrapp.h>
-#include <neigh.h>
-#include <netinet/in.h>
-#include <protect.h>
-#include <rte_arp.h>
-#include <rte_malloc.h>
-#include <rte_version.h>
-#include <vdev_define.h>
-
-static struct rte_ether_addr broadcast_hwaddr = {{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}};
-
-static inline int __local_frame_filter(const struct vdev * vdev, const struct rte_ether_addr * ether_addr)
-{
- if (rte_is_broadcast_ether_addr(ether_addr))
- return 1;
- if (rte_is_same_ether_addr(&vdev->ether_addr, ether_addr))
- return 1;
- return 0;
-}
-
-static inline int __local_inaddr_filter(const struct vdev * vdev, const struct in_addr * in_addr)
-{
- return (vdev->in_addr.s_addr == in_addr->s_addr);
-}
-
-static void arp_reply_entry(struct neighbour_manager * neigh_manager, struct vdev_instance * vdi, queue_id_t qid,
- struct rte_mbuf * mbuf, struct rte_arp_hdr * arp_header)
-{
- struct in_addr s_in_addr;
- struct in_addr d_in_addr;
- memcpy(&s_in_addr, &arp_header->arp_data.arp_sip, sizeof(struct in_addr));
- memcpy(&d_in_addr, &arp_header->arp_data.arp_tip, sizeof(struct in_addr));
- struct rte_ether_addr * s_eth_addr = (struct rte_ether_addr *)&arp_header->arp_data.arp_sha;
- struct rte_ether_addr * d_eth_addr = (struct rte_ether_addr *)&arp_header->arp_data.arp_tha;
-
- // 检测目的IP地址、MAC地址是否是本机的
- if (!__local_frame_filter(vdi->vdev, d_eth_addr))
- goto invalid_frame;
- if (!__local_inaddr_filter(vdi->vdev, &d_in_addr))
- goto invalid_frame;
-
- // TODO: 错误处理
- neigh_create_or_update(neigh_manager, s_in_addr, s_eth_addr, vdi, 0);
-
-invalid_frame:
- return;
-}
-
-static void arp_request_entry(struct neighbour_manager * neigh_manager, struct vdev_instance * vdi, queue_id_t qid,
- struct rte_mbuf * mbuf, struct rte_arp_hdr * arp_header)
-{
- struct in_addr s_in_addr;
- struct in_addr d_in_addr;
- memcpy(&s_in_addr, &arp_header->arp_data.arp_sip, sizeof(struct in_addr));
- memcpy(&d_in_addr, &arp_header->arp_data.arp_tip, sizeof(struct in_addr));
- struct rte_ether_addr * s_eth_addr = (struct rte_ether_addr *)&arp_header->arp_data.arp_sha;
- struct rte_ether_addr * d_eth_addr = (struct rte_ether_addr *)&arp_header->arp_data.arp_tha;
-
- // 过滤非广播报文和非目的MAC是本机的报文
- if (!(rte_is_zero_ether_addr(d_eth_addr) || __local_frame_filter(vdi->vdev, d_eth_addr)))
- goto done;
-
- // 根据广播的ARP报文,更新邻居表
- neigh_create_or_update(neigh_manager, s_in_addr, s_eth_addr, vdi, 0);
-
- // 对请求是本机的,进行响应
- if (!__local_inaddr_filter(vdi->vdev, &d_in_addr))
- goto done;
-
- struct rte_mbuf * reply_mbuf = PROTECT_rte_pktmbuf_alloc(vdi->direct_pool);
- if (unlikely(reply_mbuf == NULL))
- goto done;
-
- // 构造以太网头
- struct rte_ether_hdr * ether_hdr =
- (struct rte_ether_hdr *)PROTECT_rte_pktmbuf_append(reply_mbuf, sizeof(struct rte_ether_hdr));
-
-#if RTE_VERSION_NUM(21, 11, 0, 0) <= RTE_VERSION
- rte_ether_addr_copy(&vdi->vdev->ether_addr, &ether_hdr->src_addr);
- rte_ether_addr_copy(&arp_header->arp_data.arp_sha, &ether_hdr->dst_addr);
-#else
- rte_ether_addr_copy(&vdi->vdev->ether_addr, &ether_hdr->s_addr);
- rte_ether_addr_copy(&arp_header->arp_data.arp_sha, &ether_hdr->d_addr);
-#endif
- ether_hdr->ether_type = ntohs(RTE_ETHER_TYPE_ARP);
-
- // 构造ARP应答
- struct rte_arp_hdr * reply_arp_hdr =
- (struct rte_arp_hdr *)PROTECT_rte_pktmbuf_append(reply_mbuf, sizeof(struct rte_arp_hdr));
-
- rte_memcpy(reply_arp_hdr, arp_header, sizeof(struct rte_arp_hdr));
- reply_arp_hdr->arp_opcode = ntohs(RTE_ARP_OP_REPLY);
-
-#if RTE_VERSION_NUM(21, 11, 0, 0) <= RTE_VERSION
- rte_ether_addr_copy(&ether_hdr->src_addr, &reply_arp_hdr->arp_data.arp_sha);
- rte_ether_addr_copy(&ether_hdr->dst_addr, &reply_arp_hdr->arp_data.arp_tha);
-#else
- rte_ether_addr_copy(&ether_hdr->s_addr, &reply_arp_hdr->arp_data.arp_sha);
- rte_ether_addr_copy(&ether_hdr->d_addr, &reply_arp_hdr->arp_data.arp_tha);
-#endif
- reply_arp_hdr->arp_data.arp_sip = vdi->vdev->in_addr.s_addr;
- reply_arp_hdr->arp_data.arp_tip = s_in_addr.s_addr;
-
- // 写应答包到线路
- mrapp_packet_fast_send_burst(vdi, qid, &reply_mbuf, 1);
-
-done:
- return;
-}
-
-int arp_entry(struct mr_instance * instance, struct vdev_instance * vdi, queue_id_t qid, struct rte_mbuf * mbufs_in[],
- int nr_mbufs_in)
-{
- int handled_packets = 0;
-
- for (int i = 0; i < nr_mbufs_in; i++)
- {
- struct rte_mbuf * mbuf = mbufs_in[i];
- if (unlikely(mbuf == NULL))
- continue;
-
- struct rte_ether_hdr * eth_hdr = rte_pktmbuf_mtod(mbuf, struct rte_ether_hdr *);
- if (eth_hdr->ether_type != ntohs(RTE_ETHER_TYPE_ARP))
- continue;
-
- struct rte_arp_hdr * arp_hdr =
- rte_pktmbuf_mtod_offset(mbuf, struct rte_arp_hdr *, sizeof(struct rte_ether_hdr));
-
- if (arp_hdr->arp_opcode == ntohs(RTE_ARP_OP_REQUEST))
- arp_request_entry(instance->neigh, vdi, qid, mbuf, arp_hdr);
- else if (arp_hdr->arp_opcode == ntohs(RTE_ARP_OP_REPLY))
- arp_reply_entry(instance->neigh, vdi, qid, mbuf, arp_hdr);
-
- handled_packets++;
- }
-
- return handled_packets;
-}
-
-int arp_request_send(struct vdev_instance * vdi, queue_id_t qid, struct in_addr in_addr)
-{
- struct vdev * dev_info = vdi->vdev;
- if (unlikely(dev_info->enable == 0))
- {
- MR_DEBUG("Send ARP request on disable device %s, failed.", dev_info->symbol);
- return -EINVAL;
- }
-
- if (unlikely(vdi->nr_txstream == 0))
- {
- MR_DEBUG("Send ARP request on recieve only device %s, failed.", dev_info->symbol);
- return -EINVAL;
- }
-
- struct rte_ether_addr * src_hwaddr = &dev_info->ether_addr;
- struct rte_ether_addr * dst_hwaddr = &broadcast_hwaddr;
-
- struct in_addr in_addr_src = dev_info->in_addr;
- struct rte_mbuf * req_mbuf = PROTECT_rte_pktmbuf_alloc(vdi->direct_pool);
- if (unlikely(req_mbuf == NULL))
- return -ENOBUFS;
-
- // 构造以太网头
- struct rte_ether_hdr * ether_hdr =
- (struct rte_ether_hdr *)PROTECT_rte_pktmbuf_append(req_mbuf, sizeof(struct rte_ether_hdr));
-
-#if RTE_VERSION_NUM(21, 11, 0, 0) <= RTE_VERSION
- rte_ether_addr_copy(src_hwaddr, &ether_hdr->src_addr);
- rte_ether_addr_copy(dst_hwaddr, &ether_hdr->dst_addr);
-#else
- rte_ether_addr_copy(src_hwaddr, &ether_hdr->s_addr);
- rte_ether_addr_copy(dst_hwaddr, &ether_hdr->d_addr);
-#endif
- ether_hdr->ether_type = htons(RTE_ETHER_TYPE_ARP);
-
- // 构造ARP请求报文
- struct rte_arp_hdr * arp_hdr =
- (struct rte_arp_hdr *)PROTECT_rte_pktmbuf_append(req_mbuf, sizeof(struct rte_arp_hdr));
-
- arp_hdr->arp_hardware = htons(RTE_ARP_HRD_ETHER);
- arp_hdr->arp_protocol = htons(RTE_ETHER_TYPE_IPV4);
- arp_hdr->arp_hlen = 6;
- arp_hdr->arp_plen = 4;
- arp_hdr->arp_opcode = htons(RTE_ARP_OP_REQUEST);
- arp_hdr->arp_data.arp_sip = in_addr_src.s_addr;
- arp_hdr->arp_data.arp_tip = in_addr.s_addr;
-
- rte_ether_addr_copy(src_hwaddr, &arp_hdr->arp_data.arp_sha);
- memset(&arp_hdr->arp_data.arp_tha, 0, sizeof(arp_hdr->arp_data.arp_tha));
-
- // 写数据包到线路
- mrapp_packet_fast_send_burst(vdi, qid, &req_mbuf, 1);
- return 0;
-} \ No newline at end of file
diff --git a/app/src/icmp.c b/app/src/icmp.c
deleted file mode 100644
index 8ea4f0b..0000000
--- a/app/src/icmp.c
+++ /dev/null
@@ -1,98 +0,0 @@
-
-#include <mrapp.h>
-#include <rte_icmp.h>
-#include <rte_ip.h>
-#include <protect.h>
-#include <rte_version.h>
-
-static void icmp_echo_request_entry(struct vdev_instance* vdi, queue_id_t sid,
- struct rte_mbuf * mbuf, struct in_addr s_in_addr, struct in_addr d_in_addr)
-{
- /* 将原来的报文复制一份 */
- struct rte_mbuf * mbuf_cloned = PROTECT_rte_pktmbuf_alloc(vdi->direct_pool);
- PROTECT_rte_mbuf_unpoison(mbuf_cloned);
-
- char * _pkt_data = rte_pktmbuf_append(mbuf_cloned, rte_pktmbuf_data_len(mbuf));
- rte_memcpy(_pkt_data, rte_pktmbuf_mtod(mbuf, char *), rte_pktmbuf_data_len(mbuf));
-
- struct rte_icmp_hdr * icmp_hdr = rte_pktmbuf_mtod_offset(mbuf_cloned, struct rte_icmp_hdr *,
- sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr));
-
- uint32_t cksum;
- icmp_hdr->icmp_type = RTE_IP_ICMP_ECHO_REPLY;
- cksum = ~icmp_hdr->icmp_cksum & 0xffff;
- cksum += ~htons(RTE_IP_ICMP_ECHO_REQUEST << 8) & 0xffff;
- cksum += htons(RTE_IP_ICMP_ECHO_REPLY << 8);
- cksum = (cksum & 0xffff) + (cksum >> 16);
- cksum = (cksum & 0xffff) + (cksum >> 16);
- icmp_hdr->icmp_cksum = ~cksum;
-
- struct rte_ipv4_hdr * ip_hdr = rte_pktmbuf_mtod_offset(mbuf_cloned,
- struct rte_ipv4_hdr *, sizeof(struct rte_ether_hdr));
-
- ip_hdr->src_addr = d_in_addr.s_addr;
- ip_hdr->dst_addr = s_in_addr.s_addr;
- ip_hdr->hdr_checksum = 0;
- ip_hdr->hdr_checksum = rte_ipv4_cksum(ip_hdr);
-
- struct rte_ether_addr ether_addr_swap;
- struct rte_ether_hdr * eth_hdr = rte_pktmbuf_mtod(mbuf_cloned, struct rte_ether_hdr *);
-
-#if RTE_VERSION_NUM(21, 11, 0, 0) <= RTE_VERSION
- rte_ether_addr_copy(&eth_hdr->src_addr, &ether_addr_swap);
- rte_ether_addr_copy(&eth_hdr->dst_addr, &eth_hdr->src_addr);
- rte_ether_addr_copy(&ether_addr_swap, &eth_hdr->dst_addr);
-#else
- rte_ether_addr_copy(&eth_hdr->s_addr, &ether_addr_swap);
- rte_ether_addr_copy(&eth_hdr->d_addr, &eth_hdr->s_addr);
- rte_ether_addr_copy(&ether_addr_swap, &eth_hdr->d_addr);
-#endif
-
- mrapp_packet_fast_send_burst(vdi, 0, &mbuf_cloned, 1);
- return;
-}
-
-int icmp_entry(struct mr_instance * instance, struct vdev_instance* vdi,
- queue_id_t qid, struct rte_mbuf* mbufs_in[], int nr_mbufs_in)
-{
- int handled_packets = 0;
- struct in_addr s_in_addr;
- struct in_addr d_in_addr;
-
- for (int i = 0; i < nr_mbufs_in; i++)
- {
- struct rte_mbuf * mbuf = mbufs_in[i];
- if (unlikely(mbuf == NULL)) continue;
-
- // 仅处理IPv4报文
- struct rte_ether_hdr * eth_hdr = rte_pktmbuf_mtod(mbuf, struct rte_ether_hdr *);
- if (eth_hdr->ether_type != ntohs(RTE_ETHER_TYPE_IPV4)) continue;
-
- // 校验IPv4报文类型
- struct rte_ipv4_hdr * ip_hdr = rte_pktmbuf_mtod_offset(mbuf, struct rte_ipv4_hdr *,
- sizeof(struct rte_ether_hdr));
-
- if (ip_hdr->next_proto_id != IPPROTO_ICMP) continue;
-
- s_in_addr.s_addr = ip_hdr->src_addr;
- d_in_addr.s_addr = ip_hdr->dst_addr;
-
- struct rte_icmp_hdr * icmp_hdr = rte_pktmbuf_mtod_offset(mbuf, struct rte_icmp_hdr *,
- sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr));
-
- if (unlikely(vdi->vdev->in_addr.s_addr != ntohl(d_in_addr.s_addr)))
- {
- continue;
- }
-
- // ICMP回显请求,其他请求不处理
- if (icmp_hdr->icmp_type == RTE_IP_ICMP_ECHO_REQUEST)
- {
- icmp_echo_request_entry(vdi, qid, mbuf, s_in_addr, d_in_addr);
- }
-
- handled_packets++;
- }
-
- return handled_packets;
-} \ No newline at end of file
diff --git a/app/src/monit.c b/app/src/monit.c
index e543b0f..f0d79ef 100644
--- a/app/src/monit.c
+++ b/app/src/monit.c
@@ -303,14 +303,7 @@ static cJSON * monit_root(struct mr_instance * instance)
cJSON_AddItemToObject(j_root, "raw", monit_vdev(instance));
cJSON_AddItemToObject(j_root, "appstat", monit_app_stat(instance));
cJSON_AddItemToObject(j_root, "dp-trace-path", marsio_dp_trace_monit_loop(instance));
-
- if (instance->neigh != NULL)
- {
- struct cJSON * j_monit_neigh = neighbour_manager_monit(instance->neigh);
- cJSON_AddItemToObject(j_root, "neigh", j_monit_neigh);
- }
-
- return j_root;
+ return j_root;
}
int mrapp_monit_loop(struct mr_instance * instance)
diff --git a/app/src/mrb.c b/app/src/mrb.c
index c6a22c6..31c1102 100644
--- a/app/src/mrb.c
+++ b/app/src/mrb.c
@@ -13,41 +13,16 @@
void * marsio_buff_ctrlzone(marsio_buff_t * mr_buff, uint8_t id)
{
return NULL;
-
- PROTECT_rte_mbuf_unpoison_meta((struct rte_mbuf *)mr_buff);
- struct mrb_zone_idx * cz = mrbuf_cz(mr_buff, id);
- assert(id < mrbuf_cz_num(mr_buff));
-
- void * ctrl_addr = mrbuf_cz_data(mr_buff, id);
- unsigned int ctrl_len = cz->size;
-
- MR_ASAN_UNPOISON_MEMORY_REGION(ctrl_addr, ctrl_len);
- return ctrl_addr;
}
void * marsio_buff_ctrlzone_data(marsio_buff_t * mr_buff, uint8_t id, uint8_t * size)
{
return NULL;
-
- struct mrb_zone_idx * cz = mrbuf_cz(mr_buff, id);
- assert(id < mrbuf_cz_num(mr_buff));
- *size = cz->size;
- return mrbuf_cz_data(mr_buff, id);
}
void marsio_buff_ctrlzone_set(marsio_buff_t * mr_buff, uint8_t id, void * ptr_data, uint8_t size)
{
return;
-
- PROTECT_rte_mbuf_unpoison_meta((struct rte_mbuf *)mr_buff);
- struct mrb_zone_idx * cz = mrbuf_cz(mr_buff, id);
- RTE_SET_USED(cz);
-
- assert(id < mrbuf_cz_num(mr_buff));
- assert(size <= cz->size);
-
- MR_ASAN_UNPOISON_MEMORY_REGION(mrbuf_cz_data(mr_buff, id), cz->size);
- memcpy(mrbuf_cz_data(mr_buff, id), ptr_data, size);
}
void marsio_buff_ctrlzone_reset(marsio_buff_t * mr_buff)
@@ -65,12 +40,6 @@ void marsio_buff_ctrlzone_reset(marsio_buff_t * mr_buff)
void * mr_buffer_ctrlzone(struct rte_mbuf * mr_buff, uint8_t id)
{
return NULL;
-
- assert(id < mrbuf_cz_num(mr_buff));
- struct mrb_zone_idx * cz = mrbuf_cz(mr_buff, id);
-
- MR_ASAN_UNPOISON_MEMORY_REGION(mrbuf_cz_data(mr_buff, id), cz->size);
- return mrbuf_cz_data(mr_buff, id);
}
static struct rte_mempool_cache * mempool_cache_get(struct mr_instance * instance, struct rte_mempool * mp)
@@ -371,7 +340,6 @@ static void __buff_clone_ctrlzone(marsio_buff_t * mc, marsio_buff_t * md)
/* 断言,__mc和__mi同时到达链表的终点,否则说明链表克隆时出现了问题。 */
assert(__mc == NULL && __md == NULL);
- return;
}
static void __memcpy_operator_buff(struct rte_mbuf * __mi, struct rte_mbuf * __md)
@@ -649,7 +617,6 @@ int marsio_buff_is_ctrlbuf(marsio_buff_t * m)
{
struct rte_mbuf * mbuf = (struct rte_mbuf *)m;
struct mrb_metadata * mrb_metadata = (struct mrb_metadata *)mrbuf_cz_data(mbuf, MR_NODE_CTRLZONE_ID);
-
return mrb_metadata->is_ctrlbuf;
}
@@ -658,8 +625,6 @@ void marsio_buff_set_ctrlbuf(marsio_buff_t * m)
struct rte_mbuf * mbuf = (struct rte_mbuf *)m;
struct mrb_metadata * mrb_metadata = (struct mrb_metadata *)mrbuf_cz_data(mbuf, MR_NODE_CTRLZONE_ID);
mrb_metadata->is_ctrlbuf = 1;
-
- return;
}
void marsio_pktmbuf_dump(FILE * f, const marsio_buff_t * m, unsigned dump_len)
diff --git a/app/src/neigh.c b/app/src/neigh.c
deleted file mode 100644
index 769031d..0000000
--- a/app/src/neigh.c
+++ /dev/null
@@ -1,317 +0,0 @@
-/* \brief 简单协议栈邻居子系统
-*
-* 处理与主机邻接通信的所需的信息
-*
-* \author Lu Qiuwen<[email protected]>
-* \date 2016-10-21
-*/
-
-
-#include <rte_hash.h>
-#include <rte_rwlock.h>
-#include <rte_atomic.h>
-#include <rte_ether.h>
-#include <rte_ip_frag.h>
-#include <rte_timer.h>
-#include <rte_hash_crc.h>
-#include <rte_errno.h>
-#include <errno.h>
-#include <assert.h>
-#include <common.h>
-#include <neigh.h>
-#include <vdev_define.h>
-#include <arp.h>
-#include <cJSON.h>
-#include <arpa/inet.h>
-
-#include <MESA_htable.h>
-
-enum neigh_state
-{
- NEIGH_IN_NONE = 0,
- NEIGH_IN_COMPLETE = 1,
- NEIGH_IN_USE = 2,
- NEIGH_IN_TIMEOUT = 3,
- NEIGH_IN_DEAD = 4
-};
-
-static const char * str_neigh_state[] =
-{
- [NEIGH_IN_NONE] = "IN_NONE",
- [NEIGH_IN_COMPLETE] = "IN_COMPLETE",
- [NEIGH_IN_USE] = "IN_USE",
- [NEIGH_IN_TIMEOUT] = "IN_TIMEOUT",
- [NEIGH_IN_DEAD] = "IN_DEAD"
-};
-
-/* Neighbour Table Object Rule */
-struct neighbour
-{
- /* 链表项 */
- TAILQ_ENTRY(neighbour) next;
- /* IP地址 */
- struct in_addr in_addr;
- /* 目的MAC地址 */
- struct rte_ether_addr eth_addr;
- /* 设备描述符 */
- struct vdev_instance * vdi;
- /* 表项状态 */
- enum neigh_state state;
- /* 转入当前状态的时间 */
- time_t t_start_state;
- /* 上次发送ARP请求的时间 */
- time_t t_last_request;
- /* 上次收到ARP应答的时间 */
- time_t t_last_update;
- /* 永久标志位 */
- unsigned short en_permanent;
-};
-
-int neighbour_mamanger_init(struct neighbour_manager * object,
- const char * symbol, unsigned int max_entries, unsigned short t_timeout, unsigned int t_arp_send)
-{
- const static unsigned int __opt_disable = 0;
-
- object->table = MESA_htable_born();
- if(unlikely(object->table == NULL))
- {
- MR_INFO("Cannot born MESA_htable(), the ret is NULL. ");
- goto errout;
- }
-
- /* 不使用MESA_htable的线程安全特性,在外层用自旋锁保证线程安全
- * TODO: 使用MESA_htable的线程安全特性,并增加超时淘汰等特性 */
- MESA_htable_set_opt(object->table, MHO_THREAD_SAFE, (void *)&__opt_disable, sizeof(__opt_disable));
- MESA_htable_set_opt(object->table, MHO_HASH_MAX_ELEMENT_NUM, &max_entries, sizeof(max_entries));
-
- int ret = MESA_htable_mature(object->table);
- if(ret < 0)
- {
- MR_INFO("Cannot mature MESA_htable(), the ret is %d. ", ret);
- goto errout;
- }
-
- TAILQ_INIT(&object->in_none_list);
- rte_atomic16_init(&object->in_none_list_len);
- rte_spinlock_init(&object->lock);
-
- object->t_arp_send = (unsigned short)t_arp_send;
- object->t_timeout = t_timeout;
- return 0;
-
-errout:
- if (object->table != NULL) MESA_htable_destroy(object->table, NULL);
- return -1;
-}
-
-int neighbour_mamanger_deinit(struct neighbour_manager * object)
-{
- MESA_htable_destroy(object->table, NULL);
- return 0;
-}
-
-static void __change_neigh_state(struct neighbour * neigh, enum neigh_state target_state)
-{
- if (target_state != neigh->state)
- {
- neigh->state = target_state;
- neigh->t_start_state = time(NULL);
- }
-}
-
-static struct neighbour * __neigh_create_and_join_hash(struct neighbour_manager * object,
- struct in_addr in_addr, struct rte_ether_addr * eth_addr,
- struct vdev_instance * vdi, unsigned short en_permanent)
-{
- struct neighbour * neigh = malloc(sizeof(struct neighbour));
- memset(neigh, 0, sizeof(struct neighbour));
-
- neigh->in_addr = in_addr;
- neigh->vdi = vdi;
- neigh->en_permanent = en_permanent;
- neigh->t_last_update = 0;
- neigh->t_last_request = 0;
- neigh->state = NEIGH_IN_NONE;
-
- if(eth_addr != NULL)
- {
- rte_ether_addr_copy(eth_addr, &neigh->eth_addr);
- __change_neigh_state(neigh, NEIGH_IN_USE);
- }
-
- if(en_permanent)
- {
- neigh->en_permanent = 1;
- }
-
- int ret = MESA_htable_add(object->table, (const unsigned char *)&in_addr, sizeof(in_addr), neigh);
- if (unlikely(ret < 0))
- {
- MR_ERROR("Insert neigh to hash table failed, ret = %d", ret);
- goto errout;
- }
-
- return neigh;
-
-errout:
- if (neigh) free(neigh);
- return NULL;
-}
-
-static void __neigh_update(struct neighbour * neigh, struct in_addr in_addr,
- struct rte_ether_addr * eth_addr, struct vdev_instance * vdi, unsigned short en_permanent)
-{
- assert(neigh != NULL);
- neigh->in_addr = in_addr;
- neigh->en_permanent = en_permanent;
- neigh->vdi = vdi;
-
- if(eth_addr != NULL)
- {
- rte_ether_addr_copy(eth_addr, &neigh->eth_addr);
- __change_neigh_state(neigh, NEIGH_IN_USE);
- }
-
- return;
-}
-
-int neigh_create_or_update(struct neighbour_manager * object, struct in_addr in_addr,
- struct rte_ether_addr * eth_addr, struct vdev_instance * vdi, unsigned short en_permanent)
-{
- rte_spinlock_lock(&object->lock);
- struct neighbour * neigh = MESA_htable_search(object->table, (const unsigned char *)&in_addr, sizeof(in_addr));
- if (unlikely(neigh == NULL))
- {
- neigh = __neigh_create_and_join_hash(object, in_addr, eth_addr, vdi, en_permanent);
- }
- else
- {
- __neigh_update(neigh, in_addr, eth_addr, vdi, en_permanent);
- }
-
- int ret = 0;
- if (unlikely(neigh == NULL)) { ret = -1; goto exit; }
-
- time_t now_time = time(NULL);
- if (neigh->state == NEIGH_IN_NONE && (now_time - neigh->t_last_request) >= object->t_arp_send)
- {
- arp_request_send(vdi, 0, neigh->in_addr);
- neigh->t_last_request = now_time;
- }
-
- ret = 0; goto exit;
-
-exit:
- rte_spinlock_unlock(&object->lock);
- return ret;
-}
-
-int neigh_delete(struct neighbour_manager * object, struct in_addr in_addr)
-{
- rte_spinlock_lock(&object->lock);
- struct neighbour * neigh = MESA_htable_search(object->table, (const unsigned char *)&in_addr, sizeof(in_addr));
- if (neigh == NULL) goto exit;
-
- // 当处于IN_NONE状态时,从IN_NONE列表中移除
- if (neigh->state == NEIGH_IN_NONE)
- {
- TAILQ_REMOVE(&object->in_none_list,neigh, next);
- rte_atomic16_dec(&object->in_none_list_len);
- }
-
- MESA_htable_del(object->table, (const unsigned char *)&in_addr, sizeof(in_addr), NULL);
- return 0;
-
-exit:
- rte_spinlock_unlock(&object->lock);
- return -1;
-}
-
-int neigh_query(struct neighbour_manager * object, struct in_addr in_addr,
- struct rte_ether_addr * out_ether_addr, struct vdev_instance ** vdi)
-{
- rte_spinlock_lock(&object->lock);
- struct neighbour * neigh = MESA_htable_search(object->table, (const unsigned char *)&in_addr, sizeof(in_addr));
-
- // 没查到
- int ret = 0;
- if (neigh == NULL) goto exit;
-
- // 以下状态信息不全,无法对外提供查询
- if (neigh->state == NEIGH_IN_NONE ||
- neigh->state == NEIGH_IN_COMPLETE ||
- neigh->state == NEIGH_IN_DEAD)
- {
- ret = -EFAULT; goto exit;
- }
-
- rte_ether_addr_copy(&neigh->eth_addr, out_ether_addr);
- *vdi = neigh->vdi;
- ret = 0; goto exit;
-
-exit:
- rte_spinlock_unlock(&object->lock);
- return ret;
-}
-
-static void __neigh_handle_in_no_queue(struct neighbour_manager * object, queue_id_t qid)
-{
- if (rte_atomic16_read(&object->in_none_list_len) == 0) return;
-
- rte_spinlock_lock(&object->lock);
- struct neighbour * neigh_iter;
-
- // 对每一个在In-None列表中的Neigh,发ARP请求。
- TAILQ_FOREACH(neigh_iter, &object->in_none_list, next)
- {
- assert(neigh_iter->vdi != NULL);
- arp_request_send(neigh_iter->vdi, qid, neigh_iter->in_addr);
-
- // 处理完毕,移除这一项,因哈希表中还有引用,不释放空间
- TAILQ_REMOVE(&object->in_none_list, neigh_iter, next);
- rte_atomic16_dec(&object->in_none_list_len);
- }
-
- // In-None列表应当为空,数量计数应当为0
- assert(rte_atomic16_read(&object->in_none_list_len) == 0);
- rte_spinlock_unlock(&object->lock);
-}
-
-void neighbour_manager_loop_entry(struct neighbour_manager * object, queue_id_t qid)
-{
- __neigh_handle_in_no_queue(object, qid);
-}
-
-void __neighbour_manager_monit_iterate(const uchar * key, uint size, void * data, void *user)
-{
- struct neighbour * iter_neigh = (struct neighbour *)data;
- struct cJSON * j_root = (struct cJSON *)user;
- assert(iter_neigh != NULL && j_root != NULL);
-
- char str_in_addr[MR_STRING_MAX] = { 0 };
- inet_ntop(AF_INET, &iter_neigh->in_addr, str_in_addr, INET_ADDRSTRLEN);
-
- char str_ether_addr[MR_STRING_MAX] = { 0 };
- rte_ether_format_addr(str_ether_addr, sizeof(str_ether_addr), &iter_neigh->eth_addr);
-
- cJSON * j_neigh = cJSON_CreateObject();
- cJSON_AddStringToObject(j_neigh, "InAddress", str_in_addr);
- cJSON_AddStringToObject(j_neigh, "HWAddress", str_ether_addr);
- cJSON_AddStringToObject(j_neigh, "Interface", iter_neigh->vdi->vdev->symbol);
- cJSON_AddStringToObject(j_neigh, "State", str_neigh_state[iter_neigh->state]);
-
- cJSON_AddItemToArray(j_root, j_neigh);
- (void)key;
- (void)size;
-}
-
-cJSON * neighbour_manager_monit(struct neighbour_manager * object)
-{
- struct cJSON * j_root = cJSON_CreateArray();
- MR_VERIFY_MALLOC(j_root);
-
- rte_spinlock_lock(&object->lock);
- MESA_htable_iterate(object->table, __neighbour_manager_monit_iterate, j_root);
- rte_spinlock_unlock(&object->lock);
- return j_root;
-} \ No newline at end of file
diff --git a/app/src/rawio.c b/app/src/rawio.c
index 2c81576..d322b49 100644
--- a/app/src/rawio.c
+++ b/app/src/rawio.c
@@ -23,10 +23,9 @@ static inline unsigned int packet_total_len(struct rte_mbuf * mbufs[], unsigned
return total_len;
}
-int mrapp_packet_fast_send_burst(struct vdev_instance * vdi, queue_id_t qid, struct rte_mbuf * mbufs[], int nr_mbufs)
+int mrapp_packet_fast_send_burst(struct mr_instance * instance,
+ struct vdev_instance * vdi, queue_id_t qid, struct rte_mbuf * mbufs[], int nr_mbufs)
{
- PROTECT_rte_mbuf_unpoison_bulk(mbufs, nr_mbufs);
-
hash_t mbufs_hash[MR_BURST_MAX];
for (int i = 0; i < nr_mbufs; i++)
{
@@ -40,12 +39,19 @@ int mrapp_packet_fast_send_burst(struct vdev_instance * vdi, queue_id_t qid, str
*/
// TODO: 锁换一个位置
- static rte_spinlock_t __f_fast_lock = {0};
+ static rte_spinlock_t _f_fast_lock = {0};
PROTECT_rte_mbuf_poison_bulk(mbufs, nr_mbufs);
- rte_spinlock_lock(&__f_fast_lock);
- int ret = vnode_mirror_enqueue_bulk(vdi->vnode_ftx_prod, qid, mbufs, mbufs_hash, nr_mbufs);
- rte_spinlock_unlock(&__f_fast_lock);
+ rte_spinlock_lock(&_f_fast_lock);
+ vnode_mirror_enqueue_bulk(vdi->vnode_ftx_prod, qid, mbufs, mbufs_hash, nr_mbufs);
+
+ int ret = vnode_mirror_rt_object_retrieve(vdi->vnode_ftx_prod, qid, mbufs, nr_mbufs);
+ if (ret > 0)
+ {
+ marsio_buff_free_v2(instance, (marsio_buff_t **)mbufs, ret);
+ }
+
+ rte_spinlock_unlock(&_f_fast_lock);
return ret;
}
@@ -96,6 +102,7 @@ int marsio_recv_burst(struct mr_vdev * vdev, queue_id_t qid, marsio_buff_t * mbu
for (int i = 0; i < nr_mbufs_out; i++)
{
mbufs[i] = rx_buffer->mbufs[rx_buffer->curser + i];
+
if (unlikely(marsio_dp_trace_record_can_emit(mbufs[i])))
{
marsio_dp_trace_record_emit_fmt(vdev->instance, mbufs[i], "marsio_recv", "packet rx, dev=%s, qid=%u",
@@ -124,6 +131,8 @@ int marsio_recv_all_burst(struct mr_instance * instance, queue_id_t qid, marsio_
int marsio_send_buffer_flush(struct mr_vdev * vdev, queue_id_t sid)
{
struct mr_vdev_tx_buffer * tx_buffer = vdev->tx_buffer[sid];
+ struct mr_instance * instance = vdev->instance;
+
if (tx_buffer->length == 0)
{
return 0;
@@ -132,25 +141,27 @@ int marsio_send_buffer_flush(struct mr_vdev * vdev, queue_id_t sid)
hash_t hash[MR_BURST_MAX];
for (int i = 0; i < tx_buffer->length; i++)
{
- // uint16_t hash_txq = tx_buffer->mbufs[i]->hash.txadapter.txq;
- // uint32_t hash_usr = tx_buffer->mbufs[i]->hash.usr;
-
- /* if the hash_qid's highest bit is set, use the txq */
-#if 1
- // hash[i] = hash_txq & 0x8000 ? hash_txq & 0x7FFF : hash_usr;
hash[i] = tx_buffer->mbufs[i]->hash.usr;
if (marsio_dp_trace_record_can_emit(tx_buffer->mbufs[i]))
{
marsio_dp_trace_record_emit_fmt(vdev->instance, tx_buffer->mbufs[i], "marsio_send",
"packet tx, dev=%s , qid=%u, hash=%u", vdev->devsym, sid, hash[i]);
}
-#else
- /* round-robin */
- hash[i] = round_robin_counter++;
-#endif
}
int ret = vnode_mirror_enqueue_bulk(vdev->vdi->vnode_tx_prod, sid, tx_buffer->mbufs, hash, (int)tx_buffer->length);
+ if (ret < 0)
+ {
+ return ret;
+ }
+
+ /* free the mbufs backpressure from service */
+ ret = vnode_mirror_rt_object_retrieve(vdev->vdi->vnode_tx_prod, sid, tx_buffer->mbufs, (int)tx_buffer->length);
+ if (ret > 0)
+ {
+ marsio_buff_free_v2(instance, (marsio_buff_t **)tx_buffer->mbufs, ret);
+ }
+
tx_buffer->length = 0;
return ret;
}
@@ -186,21 +197,21 @@ int marsio_send_burst(struct mr_sendpath * sendpath, queue_id_t sid, marsio_buff
int marsio_send_burst_with_options(struct mr_sendpath * sendpath, queue_id_t sid, marsio_buff_t * mbufs[], int nr_mbufs,
uint16_t options)
{
- struct rte_mbuf ** __mbufs = (struct rte_mbuf **)mbufs;
- PROTECT_rte_mbuf_unpoison_bulk(__mbufs, nr_mbufs);
+ struct rte_mbuf ** _mbufs = (struct rte_mbuf **)mbufs;
+ PROTECT_rte_mbuf_unpoison_bulk(_mbufs, nr_mbufs);
assert(sid < sendpath->target_vdi->nr_txstream);
for (int i = 0; i < nr_mbufs; i++)
{
- assert(__mbufs[i]->data_len != 0 && __mbufs[i]->pkt_len != 0);
+ assert(_mbufs[i]->data_len != 0 && _mbufs[i]->pkt_len != 0);
__rte_mbuf_sanity_check(mbufs[i], i);
}
if (options & MARSIO_SEND_OPT_NO_FREE)
{
for (int i = 0; i < nr_mbufs; i++)
- rte_pktmbuf_refcnt_update(__mbufs[i], 1);
+ rte_pktmbuf_refcnt_update(_mbufs[i], 1);
}
if (options & MARSIO_SEND_OPT_REHASH)
@@ -212,7 +223,7 @@ int marsio_send_burst_with_options(struct mr_sendpath * sendpath, queue_id_t sid
hash_t hash[MR_BURST_MAX];
for (int i = 0; i < nr_mbufs; i++)
{
- hash[i] = __mbufs[i]->hash.usr;
+ hash[i] = _mbufs[i]->hash.usr;
}
/* 线程运行情况统计 */
@@ -220,10 +231,16 @@ int marsio_send_burst_with_options(struct mr_sendpath * sendpath, queue_id_t sid
{
thread_id_t tid = thread_info.thread_id;
thread_info.instance->stat[tid].packet_send_count += nr_mbufs;
- thread_info.instance->stat[tid].packet_send_length = packet_total_len(__mbufs, nr_mbufs);
+ thread_info.instance->stat[tid].packet_send_length = packet_total_len(_mbufs, nr_mbufs);
+ }
+
+ vnode_mirror_enqueue_bulk(sendpath->target_vdi->vnode_tx_prod, sid, _mbufs, hash, nr_mbufs);
+ int ret = vnode_mirror_rt_object_retrieve(sendpath->target_vdi->vnode_tx_prod, sid, _mbufs, nr_mbufs);
+ if (ret > 0)
+ {
+ marsio_buff_free_v2(sendpath->instance, mbufs, ret);
}
- vnode_mirror_enqueue_bulk(sendpath->target_vdi->vnode_tx_prod, sid, __mbufs, hash, nr_mbufs);
return RT_SUCCESS;
}
@@ -285,7 +302,7 @@ int marsio_poll_wait(struct mr_instance * instance, struct mr_vdev * vdevs[], un
return 0;
}
- /* set the notify status to sleep */
+ /* set the notification status to sleep */
for (unsigned int i = 0; i < nr_vdevs; i++)
{
struct vnode_cons_notify * cons_notify = vdevs[i]->vdi->vnode_rx_cons_notify;
@@ -313,7 +330,7 @@ int marsio_poll_wait(struct mr_instance * instance, struct mr_vdev * vdevs[], un
clear_all_zero_recv_counters(vdevs, nr_vdevs, tid);
/* read the data in pipe, and drop it */
- /* handle the read event, read the packet then redirect to shmdev queues */
+ /* handle the read event, read the packet, then redirect to shmdev queues */
for (int i = 0; i < n; i++)
{
struct rte_epoll_event * epoll_event = &epoll_events[i];
diff --git a/app/src/sendpath.c b/app/src/sendpath.c
index 5306132..453be71 100644
--- a/app/src/sendpath.c
+++ b/app/src/sendpath.c
@@ -105,245 +105,9 @@ static struct mr_sendpath * sendpath_vdev_create(struct mr_instance * instance,
return &_sendpath->_father;
}
-/* ======================================================================================= */
-
-static void sendpath_route_destory(struct mr_sendpath * sendpath)
-{
- struct __mr_sendpath_route * _sendpath = container_of(sendpath,
- struct __mr_sendpath_route, _father);
-
- free(_sendpath);
-}
-
-static int sendpath_route_l2_construct(struct mr_sendpath * sendpath,
- struct rte_mbuf * mbuf[], unsigned int nr_mbuf)
-{
- struct __mr_sendpath_route * _sendpath = container_of(sendpath,
- struct __mr_sendpath_route, _father);
-
- for (int i = 0; i < nr_mbuf; i++)
- {
- struct rte_ether_hdr * ether_hdr = (struct rte_ether_hdr *)rte_pktmbuf_prepend(mbuf[i],
- sizeof(struct rte_ether_hdr));
-
- MR_VERIFY_2(ether_hdr != NULL, "Not enough space for ethernet header in mbufs. ");
-
-#if RTE_VERSION_NUM(21, 11, 0, 0) <= RTE_VERSION
- ether_hdr->src_addr = _sendpath->src_eth_addr;
- ether_hdr->dst_addr = _sendpath->dst_eth_addr;
-#else
- ether_hdr->s_addr = _sendpath->src_eth_addr;
- ether_hdr->d_addr = _sendpath->dst_eth_addr;
-#endif
- ether_hdr->ether_type = htons(RTE_ETHER_TYPE_IPV4);
- }
-
- return 0;
-}
-
-static int sendpath_route_l3_construct(struct mr_sendpath * sendpath,
- struct rte_mbuf * mbuf[], unsigned int nr_mbuf)
-{
- struct __mr_sendpath_route * _sendpath = container_of(sendpath,
- struct __mr_sendpath_route, _father);
-
- for (int i = 0; i < nr_mbuf; i++)
- {
- struct rte_ipv4_hdr * ip_hdr = rte_pktmbuf_mtod(mbuf[i], struct rte_ipv4_hdr *);
- ip_hdr->src_addr = _sendpath->src_addr.s_addr;
- ip_hdr->dst_addr = _sendpath->dst_addr.s_addr;
-
- ip_hdr->hdr_checksum = 0;
- ip_hdr->hdr_checksum = rte_ipv4_cksum(ip_hdr);
- }
-
- return 0;
-}
-
-static int sendpath_route_requery(struct mr_sendpath * sendpath)
-{
- struct mr_instance * instance = sendpath->instance;
- struct __mr_sendpath_route * _sendpath = container_of(sendpath,
- struct __mr_sendpath_route, _father);
-
- int ret = neigh_query(instance->neigh, _sendpath->target_addr,
- &_sendpath->dst_eth_addr, &sendpath->target_vdi);
-
- if (ret < 0)
- {
- neigh_create_or_update(instance->neigh, _sendpath->target_addr, NULL,
- sendpath->target_vdi, 0); return RT_ERR;
- }
- else
- {
- sendpath->can_use = 1;
- return RT_SUCCESS;
- }
-
- return RT_SUCCESS;
-}
-
-static int sendpath_route_option_set(struct mr_instance * instance, struct mr_sendpath * sendpath,
- int opt, va_list va_list)
-{
- if (opt == MR_SENDPATH_OPT_BUILD_L2)
- {
- unsigned int enable = va_arg(va_list, unsigned int);
- sendpath->fn_l2_construct = enable ? sendpath_route_l2_construct : NULL;
- }
- else if (opt == MR_SENDPATH_OPT_BUILD_L3)
- {
- unsigned int enable = va_arg(va_list, unsigned int);
- sendpath->fn_l3_construct = enable ? sendpath_route_l3_construct : NULL;
- }
- else if (opt == MR_SENDPATH_OPT_HOOK_PREBUILD)
- {
- sendpath->fn_prebuild_hook = va_arg(va_list, void *);
- sendpath->prebuild_hook_args = va_arg(va_list, void *);
- }
- else if (opt == MR_SENDPATH_OPT_HOOK_POSTBUILD)
- {
- sendpath->fn_postbuild_hook = va_arg(va_list, void *);
- sendpath->postbuild_hook_args = va_arg(va_list, void *);
- }
- else
- {
- MR_ERROR("Invalided opt type in %s()", __FUNCTION__);
- return -EINVAL;
- }
-
- return 0;
-}
-
-static int sendpath_route_option_get(struct mr_instance * instance, struct mr_sendpath * sendpath,
- int opt, va_list va_list)
-{
- return 0;
-}
-
static struct mr_sendpath * sendpath_route_create(struct mr_instance * instance, int type, va_list va_list)
{
- struct mr_vdev * target_vdev = NULL;
- struct in_addr in_addr;
-
- /* 对于一般的路由,传入目标地址 */
- if (type == MR_SENDPATH_ROUTE_NORMAL)
- {
- target_vdev = NULL;
- in_addr = va_arg(va_list, struct in_addr);
- }
-
- /* 对于指定出口设备的路由,传入(1)设备句柄(2)目标IP地址 */
- else if (type == MR_SENDPATH_ROUTE_SPEC_DEV)
- {
- target_vdev = va_arg(va_list, struct mr_vdev *);
- in_addr = va_arg(va_list, struct in_addr);
- }
- else
- {
- assert(0);
- }
-
- struct in_addr target_in_addr = { INADDR_NONE };
-
- /* 查找合适的设备 */
- if (target_vdev != NULL) goto _build;
-
- for (int i = 0; i < instance->nr_vdevs; i++)
- {
- struct mr_vdev * vdev = &instance->vdevs[i];
- struct vdev * __vdev = vdev->vdi->vdev;
-
- if (!is_same_subnet(in_addr, __vdev->in_addr, __vdev->in_mask))
- continue;
-
- target_in_addr = in_addr;
- target_vdev = vdev;
- }
-
-_build:
-
- /* 没有找到设备,说明没有路由 */
- if (target_vdev == NULL)
- {
- char str_in_addr[MR_STRING_MAX] = { 0 };
- inet_ntop(AF_INET, &in_addr, str_in_addr, INET_ADDRSTRLEN);
-
- MR_ERROR("No route to address %s, creating route sendpath failed. ",
- str_in_addr); return NULL;
- }
-
- struct vdev * __target_vdev = target_vdev->vdi->vdev;
-
- /* 判断目标IP地址是否属于网卡所在的网段,如不在,走网关 */
- if (is_same_subnet(in_addr, __target_vdev->in_addr, __target_vdev->in_mask))
- {
- target_in_addr = in_addr;
- }
- else
- {
- target_in_addr = __target_vdev->in_gateway;
- }
-
- /* 目的地址不合法,返回 */
- if (target_in_addr.s_addr == htonl(INADDR_ANY) ||
- target_in_addr.s_addr == htonl(INADDR_LOOPBACK) ||
- target_in_addr.s_addr == htonl(INADDR_NONE))
- {
- char str_target_in_addr[MR_STRING_MAX] = { 0 };
- char str_in_addr[MR_STRING_MAX] = { 0 };
-
- inet_ntop(AF_INET, &target_in_addr, str_target_in_addr, INET_ADDRSTRLEN);
- inet_ntop(AF_INET, &in_addr, str_in_addr, INET_ADDRSTRLEN);
-
- MR_WARNING("Invalid target ip address %s(or next hop address), "
- "creating route sendpath for %s failed. ", str_target_in_addr,
- str_in_addr); return NULL;
- }
-
- struct __mr_sendpath_route * _sendpath = malloc(sizeof(struct __mr_sendpath_route));
- memset(_sendpath, 0, sizeof(struct __mr_sendpath_route));
-
- struct mr_sendpath * sendpath = &_sendpath->_father;
-
- /* 填充SendPath各虚函数指针*/
- sendpath->instance = target_vdev->instance;
- sendpath->target_vdi = target_vdev->vdi;
- sendpath->vdev = target_vdev;
-
- sendpath->fn_requery = sendpath_route_requery;
- sendpath->fn_l2_construct = sendpath_route_l2_construct;
- sendpath->fn_l3_construct = sendpath_route_l3_construct;
- sendpath->fn_l4_construct = NULL;
-
- sendpath->fn_option_get = sendpath_route_option_get;
- sendpath->fn_option_set = sendpath_route_option_set;
- sendpath->fn_destory = sendpath_route_destory;
-
- _sendpath->src_addr = sendpath->target_vdi->vdev->in_addr;
- _sendpath->dst_addr = in_addr;
- _sendpath->target_addr = target_in_addr;
- rte_ether_addr_copy(&__target_vdev->ether_addr, &_sendpath->src_eth_addr);
-
- /* 查ARP表 */
- int ret = neigh_query(target_vdev->instance->neigh, target_in_addr,
- &_sendpath->dst_eth_addr, &sendpath->target_vdi);
-
- /* 没有查询成功,保存起来下一次再查 */
- if (ret < 0)
- {
- neigh_create_or_update(target_vdev->instance->neigh, target_in_addr,
- NULL, target_vdev->vdi, 0); sendpath->can_use = 0;
- }
- else
- {
- sendpath->can_use = 1;
- }
-
- MR_DEBUG("Sendpath created: type=%d, in_addr=%u, target_in_addr=%u", type,
- in_addr.s_addr, target_in_addr.s_addr);
-
- return sendpath;
+ return NULL;
}
/* ======================================================================================== */
diff --git a/app/src/tap.c b/app/src/tap.c
index edc41a3..12994ad 100644
--- a/app/src/tap.c
+++ b/app/src/tap.c
@@ -331,7 +331,7 @@ static void * tap_representor_poll_thread_entry(void * arg)
goto errout;
}
- /* handle the read event, read the packet then redirect to shmdev queues */
+ /* handle the read event, read the packet, then redirect to shmdev queues */
for (int i = 0; i < n; i++)
{
struct tap_device * tap_dev = (struct tap_device *)(epoll_events[i].epdata.data);
@@ -345,10 +345,10 @@ static void * tap_representor_poll_thread_entry(void * arg)
continue;
}
- ret = mrapp_packet_fast_send_burst(vdev->vdi, 0, (struct rte_mbuf **)buff, RTE_DIM(buff));
+ ret = mrapp_packet_fast_send_burst(mr_instance, vdev->vdi, 0, (struct rte_mbuf **)buff, RTE_DIM(buff));
if (unlikely(ret < 0))
{
- marsio_buff_free(mr_instance, buff, RTE_DIM(buff), MARSIO_SOCKET_ID_ANY, MARSIO_LCORE_ID_ANY);
+ marsio_buff_free_v2(mr_instance, buff, RTE_DIM(buff));
}
}
}
diff --git a/include/external/marsio.h b/include/external/marsio.h
index ba3af46..a928f00 100644
--- a/include/external/marsio.h
+++ b/include/external/marsio.h
@@ -267,6 +267,8 @@ int marsio_buff_malloc_global(struct mr_instance * instance, marsio_buff_t * mar
void marsio_buff_free(struct mr_instance * instance, marsio_buff_t * marsio_buff[], unsigned int nr_mbufs,
int socket_id, int thread_id);
+void marsio_buff_free_v2(struct mr_instance * instance, marsio_buff_t * buff[], unsigned int nr_buffs);
+
void marsio_buff_do_rehash(struct mr_instance * mr_instance, marsio_buff_t * m);
int marsio_buff_is_ctrlbuf(marsio_buff_t * m);
diff --git a/infra/include/vnode.h b/infra/include/vnode.h
index 557fb76..95dd8e5 100644
--- a/infra/include/vnode.h
+++ b/infra/include/vnode.h
@@ -108,22 +108,23 @@ struct vnode;
struct vnode_prod;
struct vnode_cons;
-int vnode_mirror_enqueue_bulk(struct vnode_prod * prod,
- unsigned int prodq, struct rte_mbuf * objects[], uint32_t hash[], int nr_objects);
+int vnode_mirror_enqueue_bulk(struct vnode_prod * prod, unsigned int prodq, struct rte_mbuf * objects[],
+ uint32_t hash[], unsigned int nr_objects);
-int vnode_mirror_dequeue_burst(struct vnode_cons * cons,
- unsigned int consq, struct rte_mbuf * objects[], int nr_max_objects);
+int vnode_mirror_dequeue_burst(struct vnode_cons * cons, unsigned int consq, struct rte_mbuf * objects[],
+ int nr_max_objects);
-struct vnode * vnode_mirror_create(const char * sym, unsigned int sz_exclusive, unsigned int sz_shared,
- unsigned int sz_buffer, unsigned int notify_cons_when_rx,
- unsigned int batch_interval_us, unsigned int en_q_len_monitor);
+int vnode_mirror_rt_object_retrieve(struct vnode_prod * prod, unsigned int prodq, struct rte_mbuf * objects[],
+ unsigned int nr_max_objects);
+
+struct vnode * vnode_mirror_create(const char * sym, unsigned int sz_exclusive, unsigned int sz_buffer,
+ unsigned int notify_cons_when_rx, unsigned int batch_interval_us,
+ unsigned int en_q_len_monitor);
int vnode_mirror_delete(struct vnode * vnode);
void vnode_mirror_flush(struct vnode_prod * prod, unsigned int prodq);
-void vnode_mirror_common_unpoison(struct vnode * vnode);
-
__DECLARE_COMMON_VNODE_CREATE_PROD(mirror)
__DECLARE_COMMON_VNODE_CREATE_CONS(mirror)
__DECLARE_COMMON_VNODE_DELETE_PROD(mirror)
diff --git a/infra/src/vnode_common.c b/infra/src/vnode_common.c
index 5c4bfda..a6653ff 100644
--- a/infra/src/vnode_common.c
+++ b/infra/src/vnode_common.c
@@ -42,30 +42,31 @@
*/
static struct tunnel_desc * tunnel_new(const char * symbol, unsigned int sz_exclusive, unsigned int sz_shared,
- unsigned int sz_buffer, rte_atomic32_t * shared_counter)
+ unsigned int sz_buffer)
{
struct tunnel_desc * desc = ZMALLOC(sizeof(struct tunnel_desc));
MR_VERIFY_MALLOC(desc);
desc->tunnel_object = rte_ring_create(symbol, sz_exclusive + sz_shared, SOCKET_ID_ANY,
- RING_F_SC_DEQ | RING_F_SP_ENQ | RING_F_EXACT_SZ);
+ RING_F_SC_DEQ | RING_F_SP_ENQ);
if (desc->tunnel_object == NULL)
{
- MR_ERROR("Create tunnel %s failed : %s", symbol, MR_STR_ERRNO(errno));
+ MR_ERROR("Create tunnel %s failed : %s", symbol, strerror(errno));
goto errout;
}
snprintf(desc->symbol, sizeof(desc->symbol), "%s", symbol);
desc->tunnel_size = sz_exclusive + sz_shared;
- desc->tunnel_exclusive_size = sz_exclusive;
- desc->shared_credict_counter = shared_counter;
- desc->shared_credict_used = 0;
-
desc->en_buffer = ZMALLOC(sizeof(void *) * MR_LIBVNODE_MAX_SZ_BURST);
MR_VERIFY_MALLOC(desc->en_buffer);
+
+ desc->rt_buffer = ZMALLOC(sizeof(void *) * MR_LIBVNODE_MAX_SZ_BURST);
+ MR_VERIFY_MALLOC(desc->rt_buffer);
+
desc->sz_en_buffer = MR_LIBVNODE_MAX_SZ_BURST;
+ desc->sz_rt_buffer = MR_LIBVNODE_MAX_SZ_BURST;
#if VNODE_CHECK_THREAD_SAFE
rte_spinlock_init(&desc->lock_thread_safe_check);
@@ -85,6 +86,11 @@ errout:
FREE(&desc->en_buffer);
}
+ if (desc->rt_buffer != NULL)
+ {
+ FREE(&desc->rt_buffer);
+ }
+
FREE(&desc);
return NULL;
}
@@ -93,14 +99,19 @@ errout:
static int tunnel_delete(struct tunnel_desc * desc)
{
for (int i = 0; i < desc->sz_en_buffer_used; i++)
+ {
infra_rte_pktmbuf_free(desc->en_buffer[i]);
+ }
struct rte_mbuf * mbuf;
while (rte_ring_dequeue(desc->tunnel_object, (void **)&mbuf) == 0)
+ {
infra_rte_pktmbuf_free(mbuf);
+ }
MR_VERIFY_2(rte_ring_empty(desc->tunnel_object) == 1, "Tunnel %s is not empty", desc->symbol);
rte_free(desc->en_buffer);
+ rte_free(desc->rt_buffer);
rte_ring_free(desc->tunnel_object);
rte_free(desc);
return 0;
@@ -124,8 +135,7 @@ static int tunnel_block_delete(struct tunnel_block * block)
/* Alloc a block of tunnels, and init all the tunnels */
static struct tunnel_block * tunnel_block_new(const char * symbol, struct vnode_prod * prod, struct vnode_cons * cons,
- unsigned int sz_exclusive, unsigned int sz_shared, unsigned int sz_buffer,
- rte_atomic32_t * shared_counter)
+ unsigned int sz_exclusive, unsigned int sz_shared, unsigned int sz_buffer)
{
unsigned int nr_prodq = prod->nr_prodq;
unsigned int nr_consq = cons->nr_consq;
@@ -148,7 +158,7 @@ static struct tunnel_block * tunnel_block_new(const char * symbol, struct vnode_
char tunnel_sym[MR_SYMBOL_MAX];
snprintf(tunnel_sym, sizeof(tunnel_sym), "%s-%d-%d", symbol, prodq_id, consq_id);
- struct tunnel_desc * tdesc = tunnel_new(tunnel_sym, sz_exclusive, sz_shared, sz_buffer, shared_counter);
+ struct tunnel_desc * tdesc = tunnel_new(tunnel_sym, sz_exclusive, sz_shared, sz_buffer);
if (tdesc == NULL)
goto err;
@@ -176,8 +186,7 @@ static int do_producer_join_unsafe(struct vnode * vnode, struct vnode_prod * pro
snprintf(block_sym, sizeof(block_sym), "%s-%s-%s", vnode->symbol, prod->symbol, cons->symbol);
// create communication tunnel for each cons and prods
- block = tunnel_block_new(block_sym, prod, cons, vnode->sz_tunnel, vnode->sz_shared, vnode->sz_tunnel_buffer,
- &vnode->shared_credict_counter);
+ block = tunnel_block_new(block_sym, prod, cons, vnode->sz_tunnel, vnode->sz_shared, vnode->sz_tunnel_buffer);
if (block == NULL)
{
@@ -205,11 +214,7 @@ static int do_consumer_join_unsafe(struct vnode * vnode, struct vnode_cons * con
{
char block_sym[MR_SYMBOL_MAX];
snprintf(block_sym, sizeof(block_sym) - 1, "%s-%s-%s", vnode->symbol, prod->symbol, cons->symbol);
-
- // create a communication tunnel for all cons and prods
- rte_atomic32_t * shared_credict_counter = vnode->sz_shared > 0 ? &vnode->shared_credict_counter : NULL;
- block = tunnel_block_new(block_sym, prod, cons, vnode->sz_tunnel, vnode->sz_shared, vnode->sz_tunnel_buffer,
- shared_credict_counter);
+ block = tunnel_block_new(block_sym, prod, cons, vnode->sz_tunnel, vnode->sz_shared, vnode->sz_tunnel_buffer);
if (block == NULL)
{
@@ -221,9 +226,6 @@ static int do_consumer_join_unsafe(struct vnode * vnode, struct vnode_cons * con
}
vnode->cons = cons;
-
- /* reset the shared credict counter */
- rte_atomic32_set(&vnode->shared_credict_counter, (int32_t)vnode->sz_shared);
return 0;
error:
@@ -276,11 +278,7 @@ struct vnode_prod * __vnode_common_create_prod(struct vnode * vnode, const char
return prod;
err:
- if (prod != NULL)
- {
- rte_free(prod);
- }
-
+ rte_free(prod);
return NULL;
}
diff --git a/infra/src/vnode_common.h b/infra/src/vnode_common.h
index bc2eef7..25808b4 100644
--- a/infra/src/vnode_common.h
+++ b/infra/src/vnode_common.h
@@ -30,34 +30,22 @@ struct tunnel_desc
struct rte_ring * tunnel_object;
/* Tunnel Size */
unsigned int tunnel_size;
- /* tunnel exclusive size */
- unsigned int tunnel_exclusive_size;
/* Tunnel Enqueue Buffer */
struct rte_mbuf ** en_buffer;
-
-#if 0
- /* Tunnel Enqueue Buffer Size */
- struct rte_mbuf ** en_returned_buffer;
-#endif
+ struct rte_mbuf ** rt_buffer;
/* second cacheline, read/write */
RTE_MARKER cacheline1 __rte_cache_min_aligned;
+ /* Tunnel Enqueue Buffer Size */
unsigned int sz_en_buffer;
/* Tunnel Enqueue Buffer Used */
unsigned int sz_en_buffer_used;
- /* shared tunnel use */
- unsigned int shared_credict_used;
- /* counter */
- rte_atomic32_t * shared_credict_counter;
-
-#if 0
- /* Tunnel Enqueue Buffer Returned */
- unsigned int sz_en_buffer_returned;
- /* Tunnel Enqueue Buffer Returned */
- unsigned int sz_en_buffer_returned_used;
-#endif
+ /* Return Buffer Size */
+ unsigned int sz_rt_buffer;
+ /* Return Buffer Used */
+ unsigned int sz_rt_buffer_used;
#if VNODE_CHECK_THREAD_SAFE
/* For debug, to check concurrent access of en_buffer */
@@ -148,7 +136,6 @@ struct vnode
/* Guarantees one operator(consumer or producer, create or destroy) a time */
rte_spinlock_t lock __rte_cache_aligned;
- rte_atomic32_t shared_credict_counter __rte_cache_aligned;
};
#ifndef MR_LIBVNODE_MAX_SZ_BURST
diff --git a/infra/src/vnode_mirror.c b/infra/src/vnode_mirror.c
index 381eeaf..2ae08a2 100644
--- a/infra/src/vnode_mirror.c
+++ b/infra/src/vnode_mirror.c
@@ -1,10 +1,3 @@
-
-/* Virtual Data Deliver Node - Packet Deliver Class
- Author : Lu Qiuwen<[email protected]>
- Zheng Chao<[email protected]>
- Date : 2017-01-09
-*/
-
#include <rte_mbuf.h>
#include <rte_ring.h>
#include <rte_version.h>
@@ -14,6 +7,20 @@
#include "dp_trace.h"
#include "vnode_common.h"
+static inline unsigned int dist_tunnel_rt_objects_retrieve(struct tunnel_desc * desc, struct rte_mbuf * rt_objs[],
+ unsigned int nr_max_rt_objs)
+{
+ assert(desc->sz_rt_buffer_used <= desc->sz_rt_buffer);
+ unsigned int nr_rt_objs = desc->sz_rt_buffer_used;
+ for (unsigned int i = 0; i < nr_rt_objs; i++)
+ {
+ rt_objs[i] = desc->rt_buffer[i];
+ }
+
+ desc->sz_rt_buffer_used = 0;
+ return nr_rt_objs;
+}
+
static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons * cons, unsigned int prodq,
unsigned int consq, struct tunnel_desc * desc)
{
@@ -24,36 +31,7 @@ static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons
return;
}
- unsigned int nr_ring_count = desc->shared_credict_counter != NULL ? rte_ring_count(desc->tunnel_object) : 0;
- unsigned int nr_ring_to_use = nr_ring_count + desc->sz_en_buffer_used;
- unsigned int nr_shared_credict = 0;
-
- if (nr_ring_to_use > desc->tunnel_exclusive_size)
- {
- /* need to apply shared credict */
- assert(nr_ring_to_use >= (desc->tunnel_exclusive_size + desc->shared_credict_used));
-
- nr_shared_credict = nr_ring_to_use - (desc->tunnel_exclusive_size + desc->shared_credict_used);
- while (1)
- {
- uint32_t cur_value = rte_atomic32_read(desc->shared_credict_counter);
- uint32_t new_value = cur_value > nr_shared_credict ? cur_value - nr_shared_credict : 0;
-
- if (rte_atomic32_cmpset((volatile uint32_t *)desc->shared_credict_counter, cur_value, new_value))
- {
- nr_shared_credict = cur_value > new_value ? cur_value - new_value : 0;
- break;
- }
- }
-
- desc->shared_credict_used += nr_shared_credict;
- assert(desc->shared_credict_used <= (desc->tunnel_size - desc->tunnel_exclusive_size));
- }
-
- assert((desc->tunnel_exclusive_size + desc->shared_credict_used) >= nr_ring_count);
-
- unsigned int n_can_send = (desc->tunnel_exclusive_size + desc->shared_credict_used) - nr_ring_count;
- unsigned int n_to_send = RTE_MIN(desc->sz_en_buffer_used, n_can_send);
+ unsigned int n_to_send = desc->sz_en_buffer_used;
size_t n_send_len = 0;
for (unsigned int k = 0; k < desc->sz_en_buffer_used; k++)
@@ -76,8 +54,7 @@ static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons
}
unsigned int n_free_space;
- unsigned int n_send =
- rte_ring_sp_enqueue_burst(desc->tunnel_object, (void **)desc->en_buffer, n_to_send, &n_free_space);
+ unsigned int n_send = rte_ring_sp_enqueue_burst(desc->tunnel_object, (void **)desc->en_buffer, n_to_send, &n_free_space);
unsigned int n_send_missed = desc->sz_en_buffer_used - n_send;
/* packet is missed */
@@ -87,18 +64,11 @@ static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons
{
struct rte_mbuf * object_to_be_free = desc->en_buffer[k];
n_send_len -= rte_pktmbuf_data_len(object_to_be_free);
- infra_rte_pktmbuf_free(object_to_be_free);
- }
- /* return the shared credict */
- unsigned int nr_shared_credict_to_release = RTE_MIN(n_send_missed, nr_shared_credict);
- if (nr_shared_credict_to_release > 0)
- {
- rte_atomic32_add(desc->shared_credict_counter, (int32_t)nr_shared_credict_to_release);
- desc->shared_credict_used -= nr_shared_credict_to_release;
+ /* move the mbuf to return buffer */
+ assert(desc->sz_rt_buffer_used < desc->sz_rt_buffer);
+ desc->rt_buffer[desc->sz_rt_buffer_used++] = desc->en_buffer[k];
}
-
- assert(desc->shared_credict_used <= (desc->tunnel_size - desc->tunnel_exclusive_size));
}
struct vnode_cons_notify * cons_notify_ctx = &cons->notify[consq];
@@ -162,22 +132,11 @@ static inline void dist_tunnel_enqueue(struct vnode_prod * prod, struct vnode_co
out:
rte_spinlock_unlock(&desc->lock_thread_safe_check);
#endif
-
- return;
}
-static inline int dist_tunnel_dequeue(struct tunnel_desc * desc, void * obj, int nr_max_obj)
+static inline int dist_tunnel_dequeue(struct tunnel_desc * desc, void * obj, unsigned int nr_max_obj)
{
unsigned int nr_deq = rte_ring_sc_dequeue_burst(desc->tunnel_object, obj, nr_max_obj, NULL);
- unsigned int shared_credict_to_release = RTE_MIN(nr_deq, desc->shared_credict_used);
-
- if (shared_credict_to_release > 0)
- {
- rte_atomic32_add(desc->shared_credict_counter, (int32_t)shared_credict_to_release);
- desc->shared_credict_used -= shared_credict_to_release;
- }
-
- assert(desc->shared_credict_used <= (desc->tunnel_size - desc->tunnel_exclusive_size));
return (int)nr_deq;
}
@@ -194,14 +153,15 @@ static inline void dist_tunnel_block_flush(struct tunnel_block * block, int prod
}
}
-static inline void dist_tunnel_block_enqueue_with_hash(struct tunnel_block * block, int prodq, struct rte_mbuf * obj[],
- uint32_t hash[], int nr_obj)
+static inline void dist_tunnel_block_enqueue_with_hash(struct tunnel_block * block, unsigned int prodq, struct rte_mbuf * obj[],
+ uint32_t hash[], unsigned int nr_obj)
{
assert(nr_obj <= MR_LIBVNODE_MAX_SZ_BURST);
for (unsigned int i = 0; i < nr_obj; i++)
{
assert(obj[i] != NULL);
unsigned int consq = hash[i] % block->nr_consq;
+
struct tunnel_desc * tunnel = *tunnel_block_locate(block, prodq, consq);
dist_tunnel_enqueue(block->prod, block->cons, prodq, consq, tunnel, obj[i]);
}
@@ -209,56 +169,35 @@ static inline void dist_tunnel_block_enqueue_with_hash(struct tunnel_block * blo
dist_tunnel_block_flush(block, prodq);
}
-#if 0
-
-#define __FWDSTEP 4
-
-static inline void dist_tunnel_block_enqueue_with_hash(struct tunnel_block * block,
- int prodq, struct rte_mbuf * obj[], uint32_t hash[], int nr_obj)
+static inline unsigned int dist_tunnel_block_rt_objects_retrieve(struct tunnel_block * block, unsigned int prodq,
+ struct rte_mbuf * rt_objs[],
+ unsigned int nr_max_rt_objs)
{
- struct tunnel_desc * tunnel;
- unsigned int idx = 0;
- unsigned int consq;
+ unsigned int nr_rt_objs = 0;
+ unsigned int nr_rt_objs_left = nr_max_rt_objs;
- switch (nr_obj % 4)
+ for (unsigned int consq = 0; consq < block->nr_consq; consq++)
{
- case 0:
- while (idx != nr_obj) {
- consq = hash[idx] % block->nr_consq;
- tunnel = *tunnel_block_locate(block, prodq, consq);
- dist_tunnel_enqueue(block->prod, block->cons, prodq, consq, tunnel, obj[idx]);
- idx++;
- case 3:
- consq = hash[idx] % block->nr_consq;
- tunnel = *tunnel_block_locate(block, prodq, consq);
- dist_tunnel_enqueue(block->prod, block->cons, prodq, consq, tunnel, obj[idx]);
- idx++;
- case 2:
- consq = hash[idx] % block->nr_consq;
- tunnel = *tunnel_block_locate(block, prodq, consq);
- dist_tunnel_enqueue(block->prod, block->cons, prodq, consq, tunnel, obj[idx]);
- idx++;
- case 1:
- consq = hash[idx] % block->nr_consq;
- tunnel = *tunnel_block_locate(block, prodq, consq);
- dist_tunnel_enqueue(block->prod, block->cons, prodq, consq, tunnel, obj[idx]);
- idx++;
- }
+ struct tunnel_desc * tunnel = *tunnel_block_locate(block, prodq, consq);
+ unsigned int nr_rt_objs_recv = dist_tunnel_rt_objects_retrieve(tunnel, &rt_objs[nr_rt_objs], nr_rt_objs_left);
+ nr_rt_objs += nr_rt_objs_recv;
+ nr_rt_objs_left -= nr_rt_objs_recv;
}
-}
-#endif
+ assert(nr_rt_objs <= nr_max_rt_objs);
+ return nr_rt_objs;
+}
// Tunnel Block Dequeue, dequeue from block, only used by cons.
// TODO: rewrite in SSE/SSE2/AVX/AVX2 intrinsics
-static inline int dist_tunnel_block_dequeue(struct tunnel_block * block, int consq, struct rte_mbuf * obj[],
- int nr_max_obj)
+static inline unsigned int dist_tunnel_block_dequeue(struct tunnel_block * block, unsigned int consq, struct rte_mbuf * obj[],
+ unsigned int nr_max_obj)
{
unsigned int nr_obj = 0, nr_obj_recv = 0;
unsigned int nr_obj_left = nr_max_obj;
- for (int prodq = 0; prodq < block->nr_prodq; prodq++)
+ for (unsigned int prodq = 0; prodq < block->nr_prodq; prodq++)
{
struct tunnel_desc * tunnel = *tunnel_block_locate(block, prodq, consq);
nr_obj_recv = dist_tunnel_dequeue(tunnel, &obj[nr_obj], nr_obj_left);
@@ -270,28 +209,32 @@ static inline int dist_tunnel_block_dequeue(struct tunnel_block * block, int con
return nr_obj;
}
-int vnode_mirror_enqueue_bulk(struct vnode_prod * prod, unsigned int prodq, struct rte_mbuf * objects[],
- uint32_t hash[], int nr_objects)
+int vnode_mirror_rt_object_retrieve(struct vnode_prod * prod, unsigned int prodq, struct rte_mbuf * rt_objs[],
+ unsigned int nr_max_rt_objects)
{
- assert(nr_objects <= MR_LIBVNODE_MAX_SZ_BURST);
-
struct tunnel_block * block = ACCESS_ONCE(prod->block);
- if (unlikely(block == NULL))
+ if (likely(block != NULL))
{
- goto failure;
+ return (int)dist_tunnel_block_rt_objects_retrieve(block, prodq, rt_objs, nr_max_rt_objects);
}
+ else
+ {
+ return 0;
+ }
+}
- dist_tunnel_block_enqueue_with_hash(block, (int)prodq, objects, hash, nr_objects);
- return 0;
+int vnode_mirror_enqueue_bulk(struct vnode_prod * prod, unsigned int prodq, struct rte_mbuf * objects[],
+ uint32_t hash[], unsigned int nr_objects)
+{
+ assert(nr_objects <= MR_LIBVNODE_MAX_SZ_BURST);
-failure:
- for (int i = 0; i < nr_objects; i++)
+ struct tunnel_block * block = ACCESS_ONCE(prod->block);
+ if (unlikely(block == NULL))
{
- infra_rte_pktmbuf_free(objects[i]);
+ return -1;
}
- VNODE_STAT_UPDATE(prod, prodq, on_line, nr_objects);
- VNODE_STAT_UPDATE(prod, prodq, missed, nr_objects);
+ dist_tunnel_block_enqueue_with_hash(block, prodq, objects, hash, nr_objects);
return 0;
}
@@ -301,7 +244,7 @@ int vnode_mirror_dequeue_burst(struct vnode_cons * cons, unsigned int consq, str
struct tunnel_block * block = ACCESS_ONCE(cons->block);
if (likely(block != NULL))
{
- return dist_tunnel_block_dequeue(block, consq, objects, nr_max_objects);
+ return (int)dist_tunnel_block_dequeue(block, consq, objects, nr_max_objects);
}
else
{
@@ -318,9 +261,9 @@ void vnode_mirror_flush(struct vnode_prod * prod, unsigned int prodq)
}
}
-struct vnode * vnode_mirror_create(const char * sym, unsigned int sz_exclusive, unsigned int sz_shared,
- unsigned int sz_buffer, unsigned int notify_cons_when_rx,
- unsigned int batch_interval_us, unsigned int en_q_len_monitor)
+struct vnode * vnode_mirror_create(const char * sym, unsigned int sz_exclusive, unsigned int sz_buffer,
+ unsigned int notify_cons_when_rx, unsigned int batch_interval_us,
+ unsigned int en_q_len_monitor)
{
struct vnode * vnode_common = __vnode_common_create(sym, sz_exclusive, sz_buffer, notify_cons_when_rx);
@@ -331,10 +274,7 @@ struct vnode * vnode_mirror_create(const char * sym, unsigned int sz_exclusive,
}
vnode_common->batch_interval_tsc = batch_interval_us * rte_get_timer_cycles() / US_PER_S;
- vnode_common->sz_shared = sz_shared;
vnode_common->en_q_len_monitor = en_q_len_monitor;
-
- rte_atomic32_init(&vnode_common->shared_credict_counter);
return vnode_common;
}
@@ -343,11 +283,6 @@ int vnode_mirror_delete(struct vnode * vnode)
return __vnode_common_delete(vnode);
}
-void vnode_mirror_common_unpoison(struct vnode * vnode)
-{
- __vnode_common_unpoison(vnode);
-}
-
__USE_COMMON_VNODE_CREATE_PROD(mirror)
__USE_COMMON_VNODE_CREATE_CONS(mirror)
__USE_COMMON_VNODE_DELETE_PROD(mirror)
diff --git a/infra/test/TestVNode.cc b/infra/test/TestVNode.cc
index 6fb3dab..e6c8a50 100644
--- a/infra/test/TestVNode.cc
+++ b/infra/test/TestVNode.cc
@@ -39,7 +39,7 @@ class TestCaseVNodeQueue : public TestCaseVNode
void SetUp() override
{
- vnode_ = vnode_mirror_create("m-vnode", 1024, 0, 32, 0, 0, 0);
+ vnode_ = vnode_mirror_create("m-vnode", 1024, 32, 0, 0, 0);
ASSERT_NE(vnode_, nullptr);
assert(prod_ == nullptr);
@@ -82,7 +82,7 @@ struct rte_mempool * TestCaseVNode::pktmbuf_pool_ = nullptr;
TEST_F(TestCaseVNode, CreateAndDeleteInEmptyNode)
{
- struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 1024, 0, 32, 0, 0, 0);
+ struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 1024, 32, 0, 0, 0);
EXPECT_NE(vnode_ptr, nullptr);
int ret = vnode_mirror_delete(vnode_ptr);
@@ -91,7 +91,7 @@ TEST_F(TestCaseVNode, CreateAndDeleteInEmptyNode)
TEST_F(TestCaseVNode, CreateAndDelete)
{
- struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 1024, 0, 32, 0, 0, 0);
+ struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 1024, 32, 0, 0, 0);
ASSERT_NE(vnode_ptr, nullptr);
struct vnode_prod * prod;
@@ -109,7 +109,7 @@ TEST_F(TestCaseVNode, CreateAndDelete)
TEST_F(TestCaseVNode, CreateAndDeleteMultiThread)
{
- struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 1024, 0, 32, 0, 0, 0);
+ struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 1024, 32, 0, 0, 0);
ASSERT_NE(vnode_ptr, nullptr);
/* create multiple thread and run them at same time */
@@ -141,7 +141,7 @@ TEST_F(TestCaseVNode, CreateAndDeleteMultiThread)
TEST_F(TestCaseVNode, TestVNodeProdAndConsLookup)
{
- struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 1024, 0, 32, 0, 0, 0);
+ struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 1024, 32, 0, 0, 0);
ASSERT_NE(vnode_ptr, nullptr);
struct vnode_prod * prod;
@@ -164,7 +164,7 @@ TEST_F(TestCaseVNode, TestVNodeProdAndConsLookup)
TEST_F(TestCaseVNode, TestVNodeEnqueue)
{
- struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 1024, 0, 32, 0, 0, 0);
+ struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 1024, 32, 0, 0, 0);
ASSERT_NE(vnode_ptr, nullptr);
struct vnode_prod * prod;
@@ -183,15 +183,15 @@ TEST_F(TestCaseVNode, TestVNodeEnqueue)
int ret = rte_pktmbuf_alloc_bulk(pktmbuf_pool_, enq_objs, RTE_DIM(enq_objs));
ASSERT_EQ(ret, 0);
- for (unsigned int i = 0; i < RTE_DIM(enq_objs); i++)
+ for (auto & enq_obj : enq_objs)
{
- enq_objs[i]->hash.usr = 0x4d5a;
+ enq_obj->hash.usr = 0x4d5a;
}
uint32_t enq_hashs[TEST_COUNT] = {};
- for (unsigned int i = 0; i < RTE_DIM(enq_hashs); i++)
+ for (unsigned int & enq_hash : enq_hashs)
{
- enq_hashs[i] = 0x4d5a;
+ enq_hash = 0x4d5a;
}
int enq_ret = vnode_mirror_enqueue_bulk(prod, 0, enq_objs, enq_hashs, RTE_DIM(enq_hashs));
@@ -209,10 +209,10 @@ TEST_F(TestCaseVNode, TestVNodeEnqueue)
vnode_mirror_delete(vnode_ptr);
}
-TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueueUseSharedCredict)
+TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue)
{
/* create multiple thread */
- struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 32, 2048, 32, 0, 0, 0);
+ struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 32, 32, 0, 0, 0);
ASSERT_NE(vnode_ptr, nullptr);
struct vnode_prod * prod;
@@ -239,21 +239,45 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueueUseSharedCredict)
std::thread test_thread_1([&prod, &enq_objs, &enq_hashs] {
int enq_ret = vnode_mirror_enqueue_bulk(prod, 0, enq_objs, enq_hashs, 512);
EXPECT_EQ(enq_ret, 0);
+
+ struct rte_mbuf * rt_objects[TEST_MBUFS_COUNT];
+ int rt_ret = vnode_mirror_rt_object_retrieve(prod, 0, rt_objects, RTE_DIM(rt_objects));
+ EXPECT_EQ(rt_ret, 481);
+
+ rte_pktmbuf_free_bulk(enq_objs, rt_ret);
});
std::thread test_thread_2([&prod, &enq_objs, &enq_hashs] {
int enq_ret = vnode_mirror_enqueue_bulk(prod, 1, enq_objs + 512, enq_hashs, 512);
EXPECT_EQ(enq_ret, 0);
+
+ struct rte_mbuf * rt_objects[TEST_MBUFS_COUNT];
+ int rt_ret = vnode_mirror_rt_object_retrieve(prod, 1, rt_objects, RTE_DIM(rt_objects));
+ EXPECT_EQ(rt_ret, 481);
+
+ rte_pktmbuf_free_bulk(enq_objs, rt_ret);
});
std::thread test_thread_3([&prod, &enq_objs, &enq_hashs] {
int enq_ret = vnode_mirror_enqueue_bulk(prod, 2, enq_objs + 1024, enq_hashs, 512);
EXPECT_EQ(enq_ret, 0);
+
+ struct rte_mbuf * rt_objects[TEST_MBUFS_COUNT];
+ int rt_ret = vnode_mirror_rt_object_retrieve(prod, 2, rt_objects, RTE_DIM(rt_objects));
+ EXPECT_EQ(rt_ret, 481);
+
+ rte_pktmbuf_free_bulk(enq_objs, rt_ret);
});
std::thread test_thread_4([&prod, &enq_objs, &enq_hashs] {
int enq_ret = vnode_mirror_enqueue_bulk(prod, 3, enq_objs + 1536, enq_hashs, 512);
EXPECT_EQ(enq_ret, 0);
+
+ struct rte_mbuf * rt_objects[TEST_MBUFS_COUNT];
+ int rt_ret = vnode_mirror_rt_object_retrieve(prod, 3, rt_objects, RTE_DIM(rt_objects));
+ EXPECT_EQ(rt_ret, 481);
+
+ rte_pktmbuf_free_bulk(enq_objs, rt_ret);
});
test_thread_1.join();
@@ -275,25 +299,26 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueueUseSharedCredict)
}
EXPECT_EQ(prod_on_line_total, 2048);
- EXPECT_EQ(prod_deliver_total, 2048);
- EXPECT_EQ(prod_missed_total, 0);
+ EXPECT_EQ(prod_deliver_total, 124);
+ EXPECT_EQ(prod_missed_total, 1924);
/* on cons side */
struct vnode_cons_stat * cons_stat = vnode_mirror_cons_stat_get(cons);
EXPECT_EQ(cons_stat[0].on_line, 2048);
- EXPECT_EQ(cons_stat[0].deliver, 2048);
- EXPECT_EQ(cons_stat[0].missed, 0);
+ EXPECT_EQ(cons_stat[0].deliver, 124);
+ EXPECT_EQ(cons_stat[0].missed, 1924);
- int deq_ret = vnode_mirror_dequeue_burst(cons, 0, enq_objs, RTE_DIM(enq_objs));
- EXPECT_EQ(deq_ret, 2048);
+ struct rte_mbuf * deq_objs[TEST_MBUFS_COUNT];
+ int deq_ret = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs));
+ EXPECT_EQ(deq_ret, 124);
- rte_pktmbuf_free_bulk(enq_objs, deq_ret);
+ rte_pktmbuf_free_bulk(deq_objs, deq_ret);
vnode_mirror_delete(vnode_ptr);
}
TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict)
{
- struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 32, 512, 32, 0, 0, 0);
+ struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 32, 32, 0, 0, 0);
ASSERT_NE(vnode_ptr, nullptr);
struct vnode_prod * prod;
@@ -314,57 +339,76 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict)
uint32_t enq_hashs[TEST_MBUFS_COUNT] = {};
- for (unsigned int i = 0; i < RTE_DIM(enq_hashs); i++)
+ for (unsigned int & enq_hash : enq_hashs)
{
- enq_hashs[i] = 0x4d5a;
+ enq_hash = 0x4d5a;
}
/* first 512 mbufs, use the exclusive credit */
int enq_ret = vnode_mirror_enqueue_bulk(prod, 0, enq_objs, enq_hashs, 512);
EXPECT_EQ(enq_ret, 0);
- /* second 512 mbufs, use the shared credict */
+ int rt_ret = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, 512);
+ EXPECT_EQ(rt_ret, 481);
+ rte_pktmbuf_free_bulk(deq_objs, rt_ret);
+
+ /* at here, the ring is full, no object can be enqueue */
int enq_ret_2 = vnode_mirror_enqueue_bulk(prod, 0, enq_objs + 512, enq_hashs, 512);
EXPECT_EQ(enq_ret_2, 0);
- /* until here, we have 512 + 32 credict, so only 544 mbufs can be enqueue. */
+ int rt_ret_2 = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, 512);
+ EXPECT_EQ(rt_ret_2, 512);
+ rte_pktmbuf_free_bulk(deq_objs, rt_ret_2);
+
+ /* until here, we have 31 objects enqueue, so only 544 mbufs can be enqueue. */
int deq_ret_1 = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs));
- EXPECT_EQ(deq_ret_1, 544);
+ EXPECT_EQ(deq_ret_1, 31);
struct vnode_prod_stat * prod_stat = vnode_mirror_prod_stat_get(prod);
EXPECT_EQ(prod_stat->on_line, 1024);
- EXPECT_EQ(prod_stat->deliver, 544);
- EXPECT_EQ(prod_stat->missed, 480);
+ EXPECT_EQ(prod_stat->deliver, 31);
+ EXPECT_EQ(prod_stat->missed, 993);
struct vnode_cons_stat * cons_stat = vnode_mirror_cons_stat_get(cons);
EXPECT_EQ(cons_stat->on_line, 1024);
- EXPECT_EQ(cons_stat->deliver, 544);
- EXPECT_EQ(cons_stat->missed, 480);
+ EXPECT_EQ(cons_stat->deliver, 31);
+ EXPECT_EQ(cons_stat->missed, 993);
/* free these mbufs */
rte_pktmbuf_free_bulk(deq_objs, deq_ret_1);
+ /*****************************************************************************************/
+
/* put another 512 mbufs */
int enq_ret_3 = vnode_mirror_enqueue_bulk(prod, 0, enq_objs + 1024, enq_hashs, 512);
EXPECT_EQ(enq_ret_3, 0);
+ /* get the packet needs to be free */
+ int rt_ret_3 = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, 512);
+ EXPECT_EQ(rt_ret_3, 481);
+ rte_pktmbuf_free_bulk(deq_objs, rt_ret_3);
+
int enq_ret_4 = vnode_mirror_enqueue_bulk(prod, 0, enq_objs + 1536, enq_hashs, 512);
EXPECT_EQ(enq_ret_4, 0);
+ int rt_ret_4 = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, 512);
+ EXPECT_EQ(rt_ret_4, 512);
+ rte_pktmbuf_free_bulk(deq_objs, rt_ret_4);
+
int deq_ret_2 = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs));
- EXPECT_EQ(deq_ret_2, 544);
+ EXPECT_EQ(deq_ret_2, 31);
/* another round, the stat should be double */
prod_stat = vnode_mirror_prod_stat_get(prod);
cons_stat = vnode_mirror_cons_stat_get(cons);
EXPECT_EQ(prod_stat->on_line, 2048);
- EXPECT_EQ(prod_stat->deliver, 1088);
- EXPECT_EQ(prod_stat->missed, 960);
+ EXPECT_EQ(prod_stat->deliver, 62);
+ EXPECT_EQ(prod_stat->missed, 1986);
EXPECT_EQ(cons_stat->on_line, 2048);
- EXPECT_EQ(cons_stat->deliver, 1088);
- EXPECT_EQ(cons_stat->missed, 960);
+ EXPECT_EQ(cons_stat->deliver, 62);
+ EXPECT_EQ(cons_stat->missed, 1986);
rte_pktmbuf_free_bulk(deq_objs, deq_ret_2);
vnode_mirror_delete(vnode_ptr);
@@ -372,7 +416,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict)
TEST_F(TestCaseVNode, TestVNodeMultipleQueueUseSharedCredict)
{
- struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 32, 512, 32, 0, 0, 0);
+ struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 32, 32, 0, 0, 0);
ASSERT_NE(vnode_ptr, nullptr);
struct vnode_prod * prod;
@@ -407,9 +451,9 @@ TEST_F(TestCaseVNode, TestVNodeMultipleQueueUseSharedCredict)
/* enqueue */
uint32_t enq_hashs[32] = {};
- for (unsigned int i = 0; i < RTE_DIM(enq_hashs); i++)
+ for (unsigned int & enq_hash : enq_hashs)
{
- enq_hashs[i] = 0x4d5a;
+ enq_hash = 0x4d5a;
}
/* first 32 mbufs, use the exclusive credit */
@@ -420,26 +464,37 @@ TEST_F(TestCaseVNode, TestVNodeMultipleQueueUseSharedCredict)
int enq_ret_2 = vnode_mirror_enqueue_bulk(prod, 0, enq_objs_q1_shared, enq_hashs, 32);
EXPECT_EQ(enq_ret_2, 0);
- /* third 32 mbufs, use the exclusive credit */
+ /* retrieve the drop packets */
+ struct rte_mbuf * rt_objs[128] = {};
+ int rt_ret = vnode_mirror_rt_object_retrieve(prod, 0, rt_objs, 128);
+ EXPECT_EQ(rt_ret, 33);
+ rte_pktmbuf_free_bulk(rt_objs, rt_ret);
+
+ /* third 32 mbufs, another queue, 31 packet should be enqueue. */
int enq_ret_3 = vnode_mirror_enqueue_bulk(prod, 1, enq_objs_q2_exclusive, enq_hashs, 32);
EXPECT_EQ(enq_ret_3, 0);
- /* fourth 32 mbufs, use the shared credict */
+ /* fourth, 32 mbufs should be dropped */
int enq_ret_4 = vnode_mirror_enqueue_bulk(prod, 1, enq_objs_q2_shared, enq_hashs, 32);
EXPECT_EQ(enq_ret_4, 0);
+ /* retrieve the drop packets */
+ rt_ret = vnode_mirror_rt_object_retrieve(prod, 1, rt_objs, 128);
+ EXPECT_EQ(rt_ret, 33);
+ rte_pktmbuf_free_bulk(rt_objs, rt_ret);
+
/* dequeue */
struct rte_mbuf * deq_objs[128] = {};
int deq_ret = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs));
- EXPECT_EQ(deq_ret, 128);
+ EXPECT_EQ(deq_ret, 62);
rte_pktmbuf_free_bulk(deq_objs, deq_ret);
vnode_mirror_delete(vnode_ptr);
}
-TEST_F(TestCaseVNode, TestVNodeEnqueueUseSharedCredict)
+TEST_F(TestCaseVNode, TestVNodeEnqueueMultipleQueue)
{
- struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 512, 512, 32, 0, 0, 0);
+ struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 512, 32, 0, 0, 0);
ASSERT_NE(vnode_ptr, nullptr);
struct vnode_prod * prod;
@@ -458,34 +513,52 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueUseSharedCredict)
int ret = rte_pktmbuf_alloc_bulk(pktmbuf_pool_, enq_objs, RTE_DIM(enq_objs));
ASSERT_EQ(ret, 0);
- for (unsigned int i = 0; i < RTE_DIM(enq_objs); i++)
+ for (auto & enq_obj : enq_objs)
{
- enq_objs[i]->hash.usr = 0x4d5a;
+ enq_obj->hash.usr = 0x4d5a;
}
uint32_t enq_hashs[TEST_MBUFS_COUNT] = {};
- for (unsigned int i = 0; i < RTE_DIM(enq_hashs); i++)
+ for (unsigned int & enq_hash : enq_hashs)
{
- enq_hashs[i] = 0x4d5a;
+ enq_hash = 0x4d5a;
}
/* first 512 mbufs, use the exclusive credit */
int enq_ret = vnode_mirror_enqueue_bulk(prod, 0, enq_objs, enq_hashs, 512);
EXPECT_EQ(enq_ret, 0);
- /* second 512 mbufs, use the shared credict */
+ int rt_ret = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, 512);
+ EXPECT_EQ(rt_ret, 1);
+ rte_pktmbuf_free_bulk(deq_objs, rt_ret);
+
+ /* second 512 mbufs should be rejected */
int enq_ret_2 = vnode_mirror_enqueue_bulk(prod, 0, enq_objs + 512, enq_hashs, 512);
EXPECT_EQ(enq_ret_2, 0);
+ int rt_ret_2 = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, 512);
+ EXPECT_EQ(rt_ret_2, 512);
+ rte_pktmbuf_free_bulk(deq_objs, 512);
+
/* third 512 mbufs should be rejected */
int enq_ret_3 = vnode_mirror_enqueue_bulk(prod, 0, enq_objs + 1024, enq_hashs, 512);
EXPECT_EQ(enq_ret_3, 0);
+ int rt_ret_3 = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, 512);
+ EXPECT_EQ(rt_ret_3, 512);
+
+ /* free the rejected packets */
+ rte_pktmbuf_free_bulk(deq_objs, 512);
+
int enq_ret_4 = vnode_mirror_enqueue_bulk(prod, 0, enq_objs + 1536, enq_hashs, 512);
EXPECT_EQ(enq_ret_4, 0);
+ int rt_ret_4 = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, 512);
+ EXPECT_EQ(rt_ret_4, 512);
+ rte_pktmbuf_free_bulk(deq_objs, 512);
+
int deq_ret = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs));
- EXPECT_EQ(deq_ret, 1024);
+ EXPECT_EQ(deq_ret, 511);
rte_pktmbuf_free_bulk(deq_objs, deq_ret);
vnode_mirror_delete(vnode_ptr);
@@ -493,7 +566,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueUseSharedCredict)
TEST_F(TestCaseVNodeQueue, MultQueueEnqueue)
{
- struct vnode * vnode_ptr = vnode_mirror_create("vnode", 1024, 0, 32, 0, 0, 0);
+ struct vnode * vnode_ptr = vnode_mirror_create("vnode", 1024, 32, 0, 0, 0);
ASSERT_NE(vnode_ptr, nullptr);
struct vnode_prod * prod;
@@ -512,15 +585,15 @@ TEST_F(TestCaseVNodeQueue, MultQueueEnqueue)
int ret = rte_pktmbuf_alloc_bulk(pktmbuf_pool_, enq_objs, RTE_DIM(enq_objs));
ASSERT_EQ(ret, 0);
- for (unsigned int i = 0; i < RTE_DIM(enq_objs); i++)
+ for (auto & enq_obj : enq_objs)
{
- enq_objs[i]->hash.usr = 0x4d5a;
+ enq_obj->hash.usr = 0x4d5a;
}
uint32_t enq_hashs[TEST_COUNT] = {};
- for (unsigned int i = 0; i < RTE_DIM(enq_hashs); i++)
+ for (unsigned int & enq_hash : enq_hashs)
{
- enq_hashs[i] = 0x4d5a;
+ enq_hash = 0x4d5a;
}
for (unsigned int i = 0; i < RTE_DIM(enq_hashs); i++)
@@ -539,8 +612,8 @@ TEST_F(TestCaseVNodeQueue, MultQueueEnqueue)
EXPECT_EQ(cons_stat[0].on_line, 32);
EXPECT_EQ(cons_stat[0].deliver, 32);
EXPECT_EQ(cons_stat[0].missed, 0);
- EXPECT_EQ(cons_stat[0].q_len_max, 32);
- EXPECT_EQ(cons_stat[0].q_len_avg_max, 32);
+ EXPECT_EQ(cons_stat[0].q_len_max, 5);
+ EXPECT_LE(cons_stat[0].q_len_avg_max, 5);
int deq_ret = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs));
EXPECT_EQ(deq_ret, 32);
@@ -554,9 +627,7 @@ TEST_F(TestCaseVNodeQueue, MultQueueEnqueue)
EXPECT_EQ(cons_stat[0].on_line, 32);
EXPECT_EQ(cons_stat[0].deliver, 32);
EXPECT_EQ(cons_stat[0].missed, 0);
-
- /* after dequeue, q_len should be zero */
- EXPECT_EQ(cons_stat[0].q_len_max, 32);
+ EXPECT_EQ(cons_stat[0].q_len_max, 5);
EXPECT_LE(cons_stat[0].q_len_avg_max, 32);
rte_pktmbuf_free_bulk(deq_objs, deq_ret);
diff --git a/service/include/sc_vdev.h b/service/include/sc_vdev.h
index 6314ffd..b6fad5e 100644
--- a/service/include/sc_vdev.h
+++ b/service/include/sc_vdev.h
@@ -65,26 +65,6 @@ struct _vdev
* flush : flush packet buffer
*/
-#if 0
- int (*dispatch)(struct _vdev * _vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts,
- struct rte_mbuf * overload_pkts[], unsigned int * nr_overload_pkts, int flags);
-#endif
-
- int (*dispatch)(struct _vdev * _vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts, int flags);
- int (*collect)(struct _vdev * _vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts, int flags);
- int (*idle_pool)(struct _vdev * _vdev, queue_id_t qid);
- int (*stats_get)(struct _vdev * _vdev, struct vdev_stat_info * stat_info);
-
- /* 销毁函数 */
- int (*destory)(struct _vdev * _vdev);
-
- /* VDI创建 */
- struct vdev_instance * (
- *vdi_create)(struct _vdev * _vdev, const char * appsym, unsigned int nr_rxstream, unsigned int nr_txstream);
-
- /* VDI销毁 */
- int (*vdi_destory)(struct vdev_instance * vdi);
-
/* 统计信息暂存,通过外部接口设置,用于计算速度 */
struct vdev_stat_info stat_info_last;
};
@@ -122,6 +102,8 @@ int vdev_dispatch(struct vdev * vdev, queue_id_t qid, struct rte_mbuf * pkts[],
// 收集数据
int vdev_collect(struct vdev * vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts, int flags);
+int vdev_rt_pkts_retrieve(struct vdev * vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts);
+
// 空闲轮询
int vdev_idle_poll(struct vdev * vdev, queue_id_t qid);
@@ -136,8 +118,22 @@ int vdev_data_create(struct vdev_main * v_main, const char * symbol, unsigned in
unsigned int sz_buffer, unsigned int batch_interval_in_us, unsigned int en_q_len_monitor,
struct rte_mempool * direct_pool);
-int vdev_loop_create(struct vdev_main * v_main, const char * symbol, unsigned int sz_tunnel, unsigned int sz_buffer,
- struct rte_mempool * direct_pool, struct rte_mempool * indirect_pool);
+int vdev_data_dispatch(struct _vdev * _vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts,
+ int flags);
+
+int vdev_data_collect(struct _vdev * _vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts,
+ int flags);
+
+int vdev_data_idle_poll(struct _vdev * _vdev, queue_id_t qid);
+
+int vdev_data_stats_get(struct _vdev * _vdev, struct vdev_stat_info * stat_info);
+
+struct vdev_instance * vdev_data_instance_create(struct _vdev * _vdev, const char * appsym,
+ unsigned int nr_rxstream, unsigned int nr_txstream);
+
+int vdev_data_rt_pkts_retrieve(struct _vdev * _vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts);
+
+int vdev_data_instance_destory(struct vdev_instance * vdi);
int vdev_main_init(struct sc_main * sc);
diff --git a/service/src/node_shmdev.c b/service/src/node_shmdev.c
index 61dda48..6142d15 100644
--- a/service/src/node_shmdev.c
+++ b/service/src/node_shmdev.c
@@ -176,16 +176,20 @@ uint16_t shmdev_tx_node_process(struct rte_graph * graph, struct rte_node * node
while (nr_pkts)
{
- if (nr_pkts > RTE_GRAPH_BURST_SIZE)
- {
- vdev_dispatch(shm_dev_desc, graph->id, mbufs, RTE_GRAPH_BURST_SIZE, 0);
- nr_pkts -= RTE_GRAPH_BURST_SIZE;
- mbufs += RTE_GRAPH_BURST_SIZE;
- }
- else
+ unsigned int nr_mbufs_this_batch = (nr_pkts > RTE_GRAPH_BURST_SIZE) ? RTE_GRAPH_BURST_SIZE : nr_pkts;
+ vdev_dispatch(shm_dev_desc, graph->id, mbufs, nr_mbufs_this_batch, 0);
+
+ nr_pkts -= nr_mbufs_this_batch;
+ mbufs += nr_mbufs_this_batch;
+
+ /* retrieve the backpressure packets */
+ struct rte_mbuf * rt_mbufs[RTE_GRAPH_BURST_SIZE];
+ int ret = vdev_rt_pkts_retrieve(shm_dev_desc, graph->id, rt_mbufs, RTE_GRAPH_BURST_SIZE);
+
+ /* these packet to pkt drop node */
+ if (unlikely(ret > 0))
{
- vdev_dispatch(shm_dev_desc, graph->id, mbufs, nr_pkts, 0);
- nr_pkts = 0;
+ rte_node_enqueue(graph, node, 0, (void **)rt_mbufs, ret);
}
}
diff --git a/service/src/vdata.c b/service/src/vdata.c
index 91d74d2..e87b52c 100644
--- a/service/src/vdata.c
+++ b/service/src/vdata.c
@@ -13,7 +13,7 @@
#include <sc_common.h>
#include <sc_vdev.h>
-static int vdev_data_dispatch(struct _vdev * _vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts,
+int vdev_data_dispatch(struct _vdev * _vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts,
int flags)
{
hash_t hash_value[MR_BURST_MAX];
@@ -32,7 +32,7 @@ static int vdev_data_dispatch(struct _vdev * _vdev, queue_id_t qid, struct rte_m
return vnode_mirror_enqueue_bulk(_vdev->vnode_rx_prod, qid, pkts, hash_value, nr_pkts);
}
-static int vdev_data_collect(struct _vdev * _vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts,
+int vdev_data_collect(struct _vdev * _vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts,
int flags)
{
int __nr_mbufs_out = 0;
@@ -61,13 +61,18 @@ static int vdev_data_collect(struct _vdev * _vdev, queue_id_t qid, struct rte_mb
return __nr_mbufs_out;
}
-static int vdev_data_idle_poll(struct _vdev * _vdev, queue_id_t qid)
+int vdev_data_rt_pkts_retrieve(struct _vdev * _vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts)
+{
+ return vnode_mirror_rt_object_retrieve(_vdev->vnode_rx_prod, qid, pkts, nr_pkts);
+}
+
+int vdev_data_idle_poll(struct _vdev * _vdev, queue_id_t qid)
{
vnode_mirror_flush(_vdev->vnode_rx_prod, qid);
return 0;
}
-static int vdev_data_stats_get(struct _vdev * _vdev, struct vdev_stat_info * stat_info)
+int vdev_data_stats_get(struct _vdev * _vdev, struct vdev_stat_info * stat_info)
{
struct vnode_prod_stat * st_prod_rx = vnode_mirror_prod_stat_get(_vdev->vnode_rx_prod);
struct vnode_cons_stat * st_cons_tx = vnode_mirror_cons_stat_get(_vdev->vnode_tx_cons);
@@ -80,13 +85,6 @@ static int vdev_data_stats_get(struct _vdev * _vdev, struct vdev_stat_info * sta
stat_info->rx_deliver[i] = VNODE_STAT_READ(&st_prod_rx[i].deliver);
stat_info->rx_missed[i] = VNODE_STAT_READ(&st_prod_rx[i].missed);
stat_info->rx_total_len[i] = VNODE_STAT_READ(&st_prod_rx[i].total_len);
-
-#if 0
- stat_info->notify_state_running[i] = rte_atomic64_read(&st_prod_rx[i].notify_state_running);
- stat_info->notify_state_ready[i] = rte_atomic64_read(&st_prod_rx[i].notify_state_ready);
- stat_info->notify_state_waiting[i] = rte_atomic64_read(&st_prod_rx[i].notify_state_waiting);
-#endif
-
stat_info->batch_size_count[i] = VNODE_STAT_READ(&st_prod_rx[i].batch_size_count);
stat_info->batch_size_total[i] = VNODE_STAT_READ(&st_prod_rx[i].batch_size_total);
}
@@ -107,7 +105,7 @@ static int vdev_data_stats_get(struct _vdev * _vdev, struct vdev_stat_info * sta
return 0;
}
-static int vdev_data_instance_destory(struct vdev_instance * vdi)
+int vdev_data_instance_destory(struct vdev_instance * vdi)
{
if (vdi->vnode_rx_cons != NULL)
vnode_mirror_delete_cons(vdi->vnode_rx_cons);
@@ -121,7 +119,7 @@ static int vdev_data_instance_destory(struct vdev_instance * vdi)
}
/* 创建虚设备实例,应用使用 */
-static struct vdev_instance * vdev_data_instance_create(struct _vdev * _vdev, const char * appsym,
+struct vdev_instance * vdev_data_instance_create(struct _vdev * _vdev, const char * appsym,
unsigned int nr_rxstream, unsigned int nr_txstream)
{
struct vdev_instance * vdi = ZMALLOC(sizeof(struct vdev_instance));
@@ -227,12 +225,11 @@ int vdev_data_create(struct vdev_main * v_main, const char * symbol, unsigned in
snprintf(vnode_sym_ltx, sizeof(vnode_sym_ltx), "%s-ltx", vdev_info->symbol);
/* 创建VNODE */
- _vdev->vnode_rx = vnode_mirror_create(vnode_sym_rx, sz_tunnel_rx_exclusive, sz_tunnel_rx_shared, sz_buffer, 1,
- batch_interval_in_us, en_q_len_monitor);
-
- _vdev->vnode_tx = vnode_mirror_create(vnode_sym_tx, sz_tunnel_tx, 0, sz_buffer, 0, 0, 0);
- _vdev->vnode_ftx = vnode_mirror_create(vnode_sym_ftx, sz_tunnel_tx, 0, 0, 0, 0, 0);
- _vdev->vnode_ltx = vnode_mirror_create(vnode_sym_ltx, sz_tunnel_tx, 0, 0, 0, 0, 0);
+ _vdev->vnode_rx =
+ vnode_mirror_create(vnode_sym_rx, sz_tunnel_rx_exclusive, sz_buffer, 1, batch_interval_in_us, en_q_len_monitor);
+ _vdev->vnode_tx = vnode_mirror_create(vnode_sym_tx, sz_tunnel_tx, sz_buffer, 0, 0, 0);
+ _vdev->vnode_ftx = vnode_mirror_create(vnode_sym_ftx, sz_tunnel_tx, 0, 0, 0, 0);
+ _vdev->vnode_ltx = vnode_mirror_create(vnode_sym_ltx, sz_tunnel_tx, 0, 0, 0, 0);
#define ERR_VERIFY(x, ...) \
do \
@@ -259,15 +256,6 @@ int vdev_data_create(struct vdev_main * v_main, const char * symbol, unsigned in
ERR_VERIFY(_vdev->vnode_tx_cons, "Create vdev %s tx vnode consumer failed. ", vdev_info->symbol);
ERR_VERIFY(_vdev->vnode_ftx_cons, "Create vdev %s fast tx vnode consumer failed. ", vdev_info->symbol);
- /* 注册回调函数 */
- _vdev->dispatch = vdev_data_dispatch;
- _vdev->collect = vdev_data_collect;
- _vdev->idle_pool = vdev_data_idle_poll;
- _vdev->destory = vdev_data_destory;
- _vdev->stats_get = vdev_data_stats_get;
- _vdev->vdi_create = vdev_data_instance_create;
- _vdev->vdi_destory = vdev_data_instance_destory;
-
/* 加入虚设备列表 */
MR_VERIFY(v_main->vdev_max_idx <= RTE_DIM(v_main->_vdev_array));
vdev_info->port_id = v_main->vdev_max_idx;
@@ -277,7 +265,10 @@ int vdev_data_create(struct vdev_main * v_main, const char * symbol, unsigned in
errout:
if (_vdev != NULL)
+ {
vdev_data_destory(_vdev);
+ }
+
rte_free(_vdev);
return RT_ERR;
diff --git a/service/src/vdev.c b/service/src/vdev.c
index 491f0f7..7ce3eff 100644
--- a/service/src/vdev.c
+++ b/service/src/vdev.c
@@ -62,27 +62,33 @@ int vdev_dispatch(struct vdev * vdev, queue_id_t qid, struct rte_mbuf * pkts[],
{
struct _vdev * _vdev = container_of(vdev, struct _vdev, vdev);
mr_pdump_tx(vdev->port_id, qid, pkts, nr_pkts);
- return _vdev->dispatch(_vdev, qid, pkts, nr_pkts, flags);
+ return vdev_data_dispatch(_vdev, qid, pkts, nr_pkts, flags);
}
int vdev_collect(struct vdev * vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts, int flags)
{
struct _vdev * _vdev = container_of(vdev, struct _vdev, vdev);
- unsigned int rx_nr_mbufs = _vdev->collect(_vdev, qid, pkts, nr_pkts, flags);
+ unsigned int rx_nr_mbufs = vdev_data_collect(_vdev, qid, pkts, nr_pkts, flags);
mr_pdump_rx(vdev->port_id, qid, pkts, rx_nr_mbufs);
- return rx_nr_mbufs;
+ return (int)rx_nr_mbufs;
+}
+
+int vdev_rt_pkts_retrieve(struct vdev * vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts)
+{
+ struct _vdev * _vdev = container_of(vdev, struct _vdev, vdev);
+ return vdev_data_rt_pkts_retrieve(_vdev, qid, pkts, nr_pkts);
}
int vdev_idle_poll(struct vdev * vdev, queue_id_t qid)
{
struct _vdev * _vdev = container_of(vdev, struct _vdev, vdev);
- return _vdev->idle_pool(_vdev, qid);
+ return vdev_data_idle_poll(_vdev, qid);
}
int vdev_stats_get(struct vdev * vdev, struct vdev_stat_info * stat_info)
{
struct _vdev * _vdev = container_of(vdev, struct _vdev, vdev);
- return _vdev->stats_get(_vdev, stat_info);
+ return vdev_data_stats_get(_vdev, stat_info);
}
void vdev_stats_last_save(struct vdev * vdev, struct vdev_stat_info * stat_info_last)
@@ -136,10 +142,8 @@ static int vdev_instance_create_handler(const struct rte_mp_msg * msg, const voi
}
struct _vdev * _vdev = container_of(vdev, struct _vdev, vdev);
-
- /* 申请VDI */
struct vdev_instance * vdi =
- _vdev->vdi_create(_vdev, app_object->symbol, msg_req->nr_rxstream, msg_req->nr_txstream);
+ vdev_data_instance_create(_vdev, app_object->symbol, msg_req->nr_rxstream, msg_req->nr_txstream);
if (vdi == NULL)
{
@@ -210,9 +214,7 @@ static void shmdev_event_handler_app_unregister(struct app_main * app_main, stru
/* Mark the vdev as not in use */
vdi->vdev->in_use = VDEV_NOT_IN_USE;
-
- struct _vdev * _vdev = container_of(vdi->vdev, struct _vdev, vdev);
- _vdev->vdi_destory(vdi);
+ vdev_data_instance_destory(vdi);
}
FREE(vdev_main_pme);