summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLu Qiuwen <[email protected]>2024-04-02 19:32:36 +0800
committer陆秋文 <[email protected]>2024-04-14 12:38:19 +0000
commita14d9eab5384eaca1925670486b5e3b54dca390e (patch)
treecf94513513fd6968f97c33a0aef1ce0b9b7f245e
parent4ff22e7ad20fdb65cc26bc10400b67802df07b27 (diff)
support limits the max inflight objects in vnode.
-rw-r--r--infra/include/vnode.h2
-rw-r--r--infra/src/vnode_common.h7
-rw-r--r--infra/src/vnode_mirror.c111
-rw-r--r--infra/test/TestVNode.cc260
4 files changed, 363 insertions, 17 deletions
diff --git a/infra/include/vnode.h b/infra/include/vnode.h
index 95dd8e5..478a995 100644
--- a/infra/include/vnode.h
+++ b/infra/include/vnode.h
@@ -117,7 +117,7 @@ int vnode_mirror_dequeue_burst(struct vnode_cons * cons, unsigned int consq, str
int vnode_mirror_rt_object_retrieve(struct vnode_prod * prod, unsigned int prodq, struct rte_mbuf * objects[],
unsigned int nr_max_objects);
-struct vnode * vnode_mirror_create(const char * sym, unsigned int sz_exclusive, unsigned int sz_buffer,
+struct vnode * vnode_mirror_create(const char * sym, unsigned int sz_exclusive, unsigned int sz_max_inflight,
unsigned int notify_cons_when_rx, unsigned int batch_interval_us,
unsigned int en_q_len_monitor);
diff --git a/infra/src/vnode_common.h b/infra/src/vnode_common.h
index 25808b4..71873d1 100644
--- a/infra/src/vnode_common.h
+++ b/infra/src/vnode_common.h
@@ -38,6 +38,9 @@ struct tunnel_desc
/* second cacheline, read/write */
RTE_MARKER cacheline1 __rte_cache_min_aligned;
+ int32_t inflight_credits;
+ unsigned int send_credits_watermark;
+
/* Tunnel Enqueue Buffer Size */
unsigned int sz_en_buffer;
/* Tunnel Enqueue Buffer Used */
@@ -134,6 +137,10 @@ struct vnode
/* q_len monitor */
unsigned int en_q_len_monitor;
+ /* atomic, inflight credit */
+ int32_t credits_on_loan;
+ int32_t max_inflight;
+
/* Guarantees one operator(consumer or producer, create or destroy) a time */
rte_spinlock_t lock __rte_cache_aligned;
};
diff --git a/infra/src/vnode_mirror.c b/infra/src/vnode_mirror.c
index cef42bf..9859dd3 100644
--- a/infra/src/vnode_mirror.c
+++ b/infra/src/vnode_mirror.c
@@ -19,6 +19,68 @@ static inline unsigned int dist_tunnel_rt_objects_retrieve(struct tunnel_desc *
return nr_rt_objs;
}
+static bool dist_tunnel_acquire_credits(struct vnode * vnode_desc, struct tunnel_desc * tunnel_desc, int32_t credits)
+{
+ int32_t inflight_credits = tunnel_desc->inflight_credits;
+ int32_t missing_credits = credits - inflight_credits;
+
+ int32_t total_on_loan;
+ int32_t available;
+ int32_t acquired_credits;
+ int32_t new_total_on_loan;
+
+ if (likely(missing_credits <= 0))
+ {
+ tunnel_desc->inflight_credits -= credits;
+ return true;
+ }
+
+#define DIST_TUNNEL_DESC_MIN_CREDITS (64)
+ total_on_loan = __atomic_load_n(&vnode_desc->credits_on_loan, __ATOMIC_RELAXED);
+ available = vnode_desc->max_inflight - total_on_loan;
+
+ /* at least, acquire 64 credits */
+ acquired_credits = RTE_MAX(missing_credits, DIST_TUNNEL_DESC_MIN_CREDITS);
+
+ if (available < acquired_credits)
+ {
+ return false;
+ }
+
+ /* This is a race, no locks are involved, and thus some other
+ * thread can allocate tokens in between the check and the
+ * allocation.
+ */
+ new_total_on_loan =
+ __atomic_fetch_add(&vnode_desc->credits_on_loan, acquired_credits, __ATOMIC_RELAXED) + acquired_credits;
+
+ if (unlikely(new_total_on_loan > vnode_desc->max_inflight))
+ {
+ /* Some other port took the last credits */
+ __atomic_fetch_sub(&vnode_desc->credits_on_loan, acquired_credits, __ATOMIC_RELAXED);
+ return false;
+ }
+
+ tunnel_desc->inflight_credits += acquired_credits;
+ tunnel_desc->inflight_credits -= credits;
+ return true;
+}
+
+static void dist_tunnel_return_credits(struct vnode * vnode_desc, struct tunnel_desc * tunnel_desc, int32_t credits)
+{
+ tunnel_desc->inflight_credits += credits;
+
+#define DIST_TUNNEL_MAX_CREDITS (2 * DIST_TUNNEL_DESC_MIN_CREDITS)
+ if (unlikely(tunnel_desc->inflight_credits > DIST_TUNNEL_MAX_CREDITS))
+ {
+ int32_t leave_credits = DIST_TUNNEL_MAX_CREDITS;
+ int32_t return_credits = tunnel_desc->inflight_credits - leave_credits;
+
+ tunnel_desc->inflight_credits = leave_credits;
+ __atomic_fetch_sub(&vnode_desc->credits_on_loan, return_credits, __ATOMIC_RELAXED);
+ }
+}
+
static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons * cons, unsigned int prodq,
unsigned int consq, struct tunnel_desc * desc)
{
@@ -51,8 +113,21 @@ static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons
rte_cldemote(RTE_PTR_ADD(mbuf, RTE_CACHE_LINE_SIZE * 2));
}
- unsigned int n_free_space;
- unsigned int n_send = rte_ring_sp_enqueue_burst(desc->tunnel_object, (void **)desc->en_buffer, n_to_send, &n_free_space);
+ unsigned int n_free_space = 0;
+ unsigned int n_send = 0;
+
+ bool is_acquire_credit_success = dist_tunnel_acquire_credits(prod->vnode, desc, (int32_t)n_to_send);
+
+ /* acquire credit, if failed, drop all the packets */
+ if (likely(is_acquire_credit_success))
+ {
+ n_send = rte_ring_sp_enqueue_burst(desc->tunnel_object, (void **)desc->en_buffer, n_to_send, &n_free_space);
+ }
+ else
+ {
+ n_send = 0;
+ }
+
unsigned int n_send_missed = desc->sz_en_buffer_used - n_send;
/* packet is missed */
@@ -67,6 +142,11 @@ static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons
assert(desc->sz_rt_buffer_used < desc->sz_rt_buffer);
desc->rt_buffer[desc->sz_rt_buffer_used++] = desc->en_buffer[k];
}
+
+ if (is_acquire_credit_success)
+ {
+ dist_tunnel_return_credits(prod->vnode, desc, (int32_t)n_send_missed);
+ }
}
struct vnode_cons_notify * cons_notify_ctx = &cons->notify[consq];
@@ -132,9 +212,10 @@ out:
#endif
}
-static inline int dist_tunnel_dequeue(struct tunnel_desc * desc, void * obj, unsigned int nr_max_obj)
+static inline int dist_tunnel_dequeue(struct vnode * vnode_desc, struct tunnel_desc * tunnel_desc, void * obj, unsigned int nr_max_obj)
{
- unsigned int nr_deq = rte_ring_sc_dequeue_burst(desc->tunnel_object, obj, nr_max_obj, NULL);
+ unsigned int nr_deq = rte_ring_sc_dequeue_burst(tunnel_desc->tunnel_object, obj, nr_max_obj, NULL);
+ dist_tunnel_return_credits(vnode_desc, tunnel_desc, (int32_t)nr_deq);
return (int)nr_deq;
}
@@ -189,7 +270,8 @@ static inline unsigned int dist_tunnel_block_rt_objects_retrieve(struct tunnel_b
// Tunnel Block Dequeue, dequeue from block, only used by cons.
// TODO: rewrite in SSE/SSE2/AVX/AVX2 intrinsics
-static inline unsigned int dist_tunnel_block_dequeue(struct tunnel_block * block, unsigned int consq, struct rte_mbuf * obj[],
+static inline unsigned int dist_tunnel_block_dequeue(struct vnode * vnode_desc, struct tunnel_block * block,
+ unsigned int consq, struct rte_mbuf * obj[],
unsigned int nr_max_obj)
{
unsigned int nr_obj = 0, nr_obj_recv = 0;
@@ -198,7 +280,7 @@ static inline unsigned int dist_tunnel_block_dequeue(struct tunnel_block * block
for (unsigned int prodq = 0; prodq < block->nr_prodq; prodq++)
{
struct tunnel_desc * tunnel = *tunnel_block_locate(block, prodq, consq);
- nr_obj_recv = dist_tunnel_dequeue(tunnel, &obj[nr_obj], nr_obj_left);
+ nr_obj_recv = dist_tunnel_dequeue(vnode_desc, tunnel, &obj[nr_obj], nr_obj_left);
nr_obj += nr_obj_recv;
nr_obj_left -= nr_obj_recv;
}
@@ -242,7 +324,7 @@ int vnode_mirror_dequeue_burst(struct vnode_cons * cons, unsigned int consq, str
struct tunnel_block * block = ACCESS_ONCE(cons->block);
if (likely(block != NULL))
{
- return (int)dist_tunnel_block_dequeue(block, consq, objects, nr_max_objects);
+ return (int)dist_tunnel_block_dequeue(cons->vnode, block, consq, objects, nr_max_objects);
}
else
{
@@ -259,11 +341,11 @@ void vnode_mirror_flush(struct vnode_prod * prod, unsigned int prodq)
}
}
-struct vnode * vnode_mirror_create(const char * sym, unsigned int sz_exclusive, unsigned int sz_buffer,
+struct vnode * vnode_mirror_create(const char * sym, unsigned int sz_exclusive, unsigned int sz_max_inflight,
unsigned int notify_cons_when_rx, unsigned int batch_interval_us,
unsigned int en_q_len_monitor)
{
- struct vnode * vnode_common = __vnode_common_create(sym, sz_exclusive, sz_buffer, notify_cons_when_rx);
+ struct vnode * vnode_common = __vnode_common_create(sym, sz_exclusive, sz_max_inflight, notify_cons_when_rx);
if (vnode_common == NULL)
{
@@ -271,6 +353,17 @@ struct vnode * vnode_mirror_create(const char * sym, unsigned int sz_exclusive,
return NULL;
}
+ if (sz_max_inflight == 0)
+ {
+ vnode_common->max_inflight = INT32_MAX;
+ vnode_common->credits_on_loan = 0;
+ }
+ else
+ {
+ vnode_common->max_inflight = (int32_t)sz_max_inflight;
+ vnode_common->credits_on_loan = 0;
+ }
+
vnode_common->batch_interval_tsc = batch_interval_us * rte_get_timer_cycles() / US_PER_S;
vnode_common->en_q_len_monitor = en_q_len_monitor;
return vnode_common;
diff --git a/infra/test/TestVNode.cc b/infra/test/TestVNode.cc
index e6c8a50..77d5bb5 100644
--- a/infra/test/TestVNode.cc
+++ b/infra/test/TestVNode.cc
@@ -39,7 +39,7 @@ class TestCaseVNodeQueue : public TestCaseVNode
void SetUp() override
{
- vnode_ = vnode_mirror_create("m-vnode", 1024, 32, 0, 0, 0);
+ vnode_ = vnode_mirror_create("m-vnode", 1024, 0, 0, 0, 0);
ASSERT_NE(vnode_, nullptr);
assert(prod_ == nullptr);
@@ -164,7 +164,7 @@ TEST_F(TestCaseVNode, TestVNodeProdAndConsLookup)
TEST_F(TestCaseVNode, TestVNodeEnqueue)
{
- struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 1024, 32, 0, 0, 0);
+ struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 1024, 0, 0, 0, 0);
ASSERT_NE(vnode_ptr, nullptr);
struct vnode_prod * prod;
@@ -212,7 +212,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueue)
TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue)
{
/* create multiple thread */
- struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 32, 32, 0, 0, 0);
+ struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 32, 0, 0, 0, 0);
ASSERT_NE(vnode_ptr, nullptr);
struct vnode_prod * prod;
@@ -318,7 +318,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue)
TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict)
{
- struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 32, 32, 0, 0, 0);
+ struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 32, 0, 0, 0, 0);
ASSERT_NE(vnode_ptr, nullptr);
struct vnode_prod * prod;
@@ -416,7 +416,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict)
TEST_F(TestCaseVNode, TestVNodeMultipleQueueUseSharedCredict)
{
- struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 32, 32, 0, 0, 0);
+ struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 32, 0, 0, 0, 0);
ASSERT_NE(vnode_ptr, nullptr);
struct vnode_prod * prod;
@@ -494,7 +494,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleQueueUseSharedCredict)
TEST_F(TestCaseVNode, TestVNodeEnqueueMultipleQueue)
{
- struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 512, 32, 0, 0, 0);
+ struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 512, 0, 0, 0, 0);
ASSERT_NE(vnode_ptr, nullptr);
struct vnode_prod * prod;
@@ -564,9 +564,255 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueMultipleQueue)
vnode_mirror_delete(vnode_ptr);
}
+TEST_F(TestCaseVNode, TestVNodeEnableMaxInFlight_NoEnoughCredits)
+{
+ struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 16384, 32, 0, 0, 0);
+ ASSERT_NE(vnode_ptr, nullptr);
+
+ struct vnode_prod * prod = vnode_mirror_create_prod(vnode_ptr, "prod", 8);
+ ASSERT_NE(prod, nullptr);
+
+ struct vnode_cons * cons = vnode_mirror_create_cons(vnode_ptr, "cons", 1);
+ ASSERT_NE(cons, nullptr);
+
+ static constexpr int test_object_count = 64;
+ struct rte_mbuf * enq_objs[test_object_count] = {};
+ struct rte_mbuf * deq_objs[test_object_count] = {};
+
+ int ret = rte_pktmbuf_alloc_bulk(pktmbuf_pool_, enq_objs, RTE_DIM(enq_objs));
+ ASSERT_EQ(ret, 0);
+
+ for (auto & enq_obj : enq_objs)
+ {
+ enq_obj->hash.usr = 0x4d5a;
+ }
+
+ uint32_t enq_hashs[test_object_count] = {};
+ for (unsigned int & enq_hash : enq_hashs)
+ {
+ enq_hash = 0x4d5a;
+ }
+
+ /* exceed the credit, all packets should be dropped */
+ int enq_ret = vnode_mirror_enqueue_bulk(prod, 0, enq_objs, enq_hashs, RTE_DIM(enq_hashs));
+ EXPECT_EQ(enq_ret, 0);
+
+ int rt_ret = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, RTE_DIM(deq_objs));
+ EXPECT_EQ(rt_ret, 64);
+ rte_pktmbuf_free_bulk(deq_objs, rt_ret);
+
+ int deq_ret = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs));
+ EXPECT_EQ(deq_ret, 0);
+ rte_pktmbuf_free_bulk(deq_objs, deq_ret);
+
+ vnode_mirror_delete(vnode_ptr);
+}
+
+TEST_F(TestCaseVNode, TestVNodeEnableMaxInFlight_EnoughCredits)
+{
+ struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 16384, 64, 0, 0, 0);
+ ASSERT_NE(vnode_ptr, nullptr);
+
+ struct vnode_prod * prod = vnode_mirror_create_prod(vnode_ptr, "prod", 8);
+ ASSERT_NE(prod, nullptr);
+
+ struct vnode_cons * cons = vnode_mirror_create_cons(vnode_ptr, "cons", 1);
+ ASSERT_NE(cons, nullptr);
+
+ static constexpr int test_object_count = 64;
+ struct rte_mbuf * enq_objs[test_object_count] = {};
+ struct rte_mbuf * deq_objs[test_object_count] = {};
+
+ int ret = rte_pktmbuf_alloc_bulk(pktmbuf_pool_, enq_objs, RTE_DIM(enq_objs));
+ ASSERT_EQ(ret, 0);
+
+ for (auto & enq_obj : enq_objs)
+ {
+ enq_obj->hash.usr = 0x4d5a;
+ }
+
+ uint32_t enq_hashs[test_object_count] = {};
+ for (unsigned int & enq_hash : enq_hashs)
+ {
+ enq_hash = 0x4d5a;
+ }
+
+ /* exceed the credit, all packets should be dropped */
+ int enq_ret = vnode_mirror_enqueue_bulk(prod, 0, enq_objs, enq_hashs, RTE_DIM(enq_hashs));
+ EXPECT_EQ(enq_ret, 0);
+
+ int rt_ret = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, RTE_DIM(deq_objs));
+ EXPECT_EQ(rt_ret, 0);
+ rte_pktmbuf_free_bulk(deq_objs, rt_ret);
+
+ int deq_ret = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs));
+ EXPECT_EQ(deq_ret, 64);
+ rte_pktmbuf_free_bulk(deq_objs, deq_ret);
+
+ vnode_mirror_delete(vnode_ptr);
+}
+
+TEST_F(TestCaseVNode, TestVNodeEnableMaxInFlight_RefillCredits)
+{
+ struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 16384, 64, 0, 0, 0);
+ ASSERT_NE(vnode_ptr, nullptr);
+
+ struct vnode_prod * prod = vnode_mirror_create_prod(vnode_ptr, "prod", 8);
+ ASSERT_NE(prod, nullptr);
+
+ struct vnode_cons * cons = vnode_mirror_create_cons(vnode_ptr, "cons", 1);
+ ASSERT_NE(cons, nullptr);
+
+ static constexpr int test_object_count = 64;
+ struct rte_mbuf * enq_objs[test_object_count] = {};
+ struct rte_mbuf * deq_objs[test_object_count] = {};
+
+ int ret = rte_pktmbuf_alloc_bulk(pktmbuf_pool_, enq_objs, RTE_DIM(enq_objs));
+ ASSERT_EQ(ret, 0);
+
+ for (auto & enq_obj : enq_objs)
+ {
+ enq_obj->hash.usr = 0x4d5a;
+ }
+
+ uint32_t enq_hashs[test_object_count] = {};
+ for (unsigned int & enq_hash : enq_hashs)
+ {
+ enq_hash = 0x4d5a;
+ }
+
+ /* first round, all the packet should be enqueued */
+ int enq_ret = vnode_mirror_enqueue_bulk(prod, 0, enq_objs, enq_hashs, RTE_DIM(enq_hashs));
+ EXPECT_EQ(enq_ret, 0);
+
+ int rt_ret = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, RTE_DIM(deq_objs));
+ EXPECT_EQ(rt_ret, 0);
+ rte_pktmbuf_free_bulk(deq_objs, rt_ret);
+
+ /* second round, the packets should be dropped */
+ struct rte_mbuf * enq_objs_2[test_object_count] = {};
+ int ret_2 = rte_pktmbuf_alloc_bulk(pktmbuf_pool_, enq_objs_2, RTE_DIM(enq_objs_2));
+ ASSERT_EQ(ret_2, 0);
+
+ for (auto & enq_obj : enq_objs_2)
+ {
+ enq_obj->hash.usr = 0x4d5a;
+ }
+
+ uint32_t enq_hashs_2[test_object_count] = {};
+ for (unsigned int & enq_hash : enq_hashs_2)
+ {
+ enq_hash = 0x4d5a;
+ }
+
+ int enq_ret_2 = vnode_mirror_enqueue_bulk(prod, 0, enq_objs_2, enq_hashs_2, RTE_DIM(enq_hashs_2));
+ EXPECT_EQ(enq_ret_2, 0);
+
+ int rt_ret_2 = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, RTE_DIM(deq_objs));
+ EXPECT_EQ(rt_ret_2, 64);
+ rte_pktmbuf_free_bulk(deq_objs, rt_ret_2);
+
+ /* then, dequeue, the credits should be refilled */
+ int deq_ret = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs));
+ EXPECT_EQ(deq_ret, 64);
+ rte_pktmbuf_free_bulk(deq_objs, deq_ret);
+
+ /* third round, all the packet should be enqueued */
+ struct rte_mbuf * enq_objs_3[test_object_count] = {};
+ int ret_3 = rte_pktmbuf_alloc_bulk(pktmbuf_pool_, enq_objs_3, RTE_DIM(enq_objs_3));
+ ASSERT_EQ(ret_3, 0);
+
+ for (auto & enq_obj : enq_objs_3)
+ {
+ enq_obj->hash.usr = 0x4d5a;
+ }
+
+ uint32_t enq_hashs_3[test_object_count] = {};
+ for (unsigned int & enq_hash : enq_hashs_3)
+ {
+ enq_hash = 0x4d5a;
+ }
+
+ int enq_ret_3 = vnode_mirror_enqueue_bulk(prod, 0, enq_objs_3, enq_hashs_3, RTE_DIM(enq_hashs_3));
+ EXPECT_EQ(enq_ret_3, 0);
+
+ int rt_ret_3 = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, RTE_DIM(deq_objs));
+ EXPECT_EQ(rt_ret_3, 0);
+ rte_pktmbuf_free_bulk(deq_objs, rt_ret_3);
+
+ /* dequeue */
+ int deq_ret_3 = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs));
+ EXPECT_EQ(deq_ret_3, 64);
+ rte_pktmbuf_free_bulk(deq_objs, deq_ret_3);
+
+ vnode_mirror_delete(vnode_ptr);
+}
+
+TEST_F(TestCaseVNode, TestVNodeEnableMaxInFlight_MultipleThreadProducer)
+{
+ struct vnode * vnode_ptr = vnode_mirror_create("m-vnode", 16384, 64, 0, 0, 0);
+ ASSERT_NE(vnode_ptr, nullptr);
+
+ struct vnode_prod * prod = vnode_mirror_create_prod(vnode_ptr, "prod", 4);
+ ASSERT_NE(prod, nullptr);
+
+ struct vnode_cons * cons = vnode_mirror_create_cons(vnode_ptr, "cons", 1);
+ ASSERT_NE(cons, nullptr);
+
+ rte_atomic32_t total_dropped_objects;
+ rte_atomic32_clear(&total_dropped_objects);
+
+ auto f_producer = [vnode_ptr, prod, &total_dropped_objects](unsigned int hash) {
+ struct rte_mbuf * enq_objs[64] = {};
+ struct rte_mbuf * deq_objs[64] = {};
+
+ int ret = rte_pktmbuf_alloc_bulk(pktmbuf_pool_, enq_objs, RTE_DIM(enq_objs));
+ ASSERT_EQ(ret, 0);
+
+ for (auto & enq_obj : enq_objs)
+ {
+ enq_obj->hash.usr = hash;
+ }
+
+ uint32_t enq_hashs[64] = {};
+ for (unsigned int & enq_hash : enq_hashs)
+ {
+ enq_hash = hash;
+ }
+
+ int enq_ret = vnode_mirror_enqueue_bulk(prod, 0, enq_objs, enq_hashs, RTE_DIM(enq_hashs));
+ EXPECT_EQ(enq_ret, 0);
+
+ int rt_ret = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, RTE_DIM(deq_objs));
+ rte_pktmbuf_free_bulk(deq_objs, rt_ret);
+ rte_atomic32_add(&total_dropped_objects, rt_ret);
+ };
+
+ std::thread t_producer_1(f_producer, 0x4d5a);
+ std::thread t_producer_2(f_producer, 0x4d6a);
+ std::thread t_producer_3(f_producer, 0x4d7a);
+ std::thread t_producer_4(f_producer, 0x4d8a);
+
+ t_producer_1.join();
+ t_producer_2.join();
+ t_producer_3.join();
+ t_producer_4.join();
+
+ EXPECT_EQ(rte_atomic32_read(&total_dropped_objects), 192);
+
+ /* dequeue */
+ struct rte_mbuf * deq_objs[64] = {};
+ int deq_ret = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs));
+ EXPECT_EQ(deq_ret, 64);
+ rte_pktmbuf_free_bulk(deq_objs, deq_ret);
+
+ /* delete the vnode */
+ vnode_mirror_delete(vnode_ptr);
+}
+
TEST_F(TestCaseVNodeQueue, MultQueueEnqueue)
{
- struct vnode * vnode_ptr = vnode_mirror_create("vnode", 1024, 32, 0, 0, 0);
+ struct vnode * vnode_ptr = vnode_mirror_create("vnode", 1024, 0, 0, 0, 0);
ASSERT_NE(vnode_ptr, nullptr);
struct vnode_prod * prod;