summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLu Qiuwen <[email protected]>2023-09-01 10:19:04 +0800
committerLu Qiuwen <[email protected]>2023-09-01 15:06:41 +0800
commit42a34334ebe495254b43a4e532bb3d9292e2ed42 (patch)
treecc599d7dd58cd3bd4a5174839a2088797d5340c4
parent5e5afdd30f9274038a3ed6130a8f163cd4f4391b (diff)
调整shared_counter的计数方式,修正shared_counter成为负值的问题。v4.6.49-20230901
-rw-r--r--infra/include/common.h1
-rw-r--r--infra/include/vnode.h4
-rw-r--r--infra/src/vnode_common.c3
-rw-r--r--infra/src/vnode_common.h6
-rw-r--r--infra/src/vnode_mirror.c90
-rw-r--r--infra/test/TestVNode.cc42
-rw-r--r--service/src/vdata.c2
7 files changed, 109 insertions, 39 deletions
diff --git a/infra/include/common.h b/infra/include/common.h
index ef39b99..f88b0b8 100644
--- a/infra/include/common.h
+++ b/infra/include/common.h
@@ -248,6 +248,7 @@ static inline int parser_uint(const char * str)
// TODO: 支持更多比特位的掩码
typedef uint64_t mask_t;
+#define ACCESS_ONCE(x) (*(volatile typeof(x) *)&(x))
/* ================================= MASK ======================================== */
static unsigned int inline mask_popcnt(mask_t mask)
diff --git a/infra/include/vnode.h b/infra/include/vnode.h
index 0593b22..e7d69ba 100644
--- a/infra/include/vnode.h
+++ b/infra/include/vnode.h
@@ -97,8 +97,8 @@ struct vnode_prod_stat
volatile uint64_t missed;
volatile uint64_t total_len;
- volatile uint64_t cloned;
- volatile uint64_t cloned_fail;
+ volatile int64_t ring_elem_count_avg;
+ volatile int64_t ring_shared_credict_avg;
/* notify */
volatile uint64_t notify_state_waiting;
diff --git a/infra/src/vnode_common.c b/infra/src/vnode_common.c
index 1f077aa..db6e9c8 100644
--- a/infra/src/vnode_common.c
+++ b/infra/src/vnode_common.c
@@ -221,6 +221,9 @@ static int do_consumer_join_unsafe(struct vnode * vnode, struct vnode_cons * con
}
vnode->cons = cons;
+
+ /* reset the shared credict counter */
+ rte_atomic32_set(&vnode->shared_credict_counter, (int32_t)vnode->sz_shared);
return 0;
error:
diff --git a/infra/src/vnode_common.h b/infra/src/vnode_common.h
index c089bf8..ce1998d 100644
--- a/infra/src/vnode_common.h
+++ b/infra/src/vnode_common.h
@@ -157,6 +157,12 @@ do { \
do {} while(0)
#endif
+#define VNODE_STAT_UPDATE_AVG(desc, queue, item, value) \
+do { \
+ desc->stat[queue].item += (typeof(desc->stat[queue].item))(0.2 * (value - desc->stat[queue].item)); \
+} while(0)
+
+
static inline struct tunnel_desc ** tunnel_block_locate(struct tunnel_block * block,
unsigned int prodq_id, unsigned int consq_id)
{
diff --git a/infra/src/vnode_mirror.c b/infra/src/vnode_mirror.c
index e878bbd..d613618 100644
--- a/infra/src/vnode_mirror.c
+++ b/infra/src/vnode_mirror.c
@@ -28,19 +28,21 @@ static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons
return;
}
- unsigned int nr_ring_count = rte_ring_count(desc->tunnel_object);
+ unsigned int nr_ring_count = desc->shared_credict_counter != NULL ? rte_ring_count(desc->tunnel_object) : 0;
unsigned int nr_ring_to_use = nr_ring_count + desc->sz_en_buffer_used;
unsigned int nr_shared_credict = 0;
if (nr_ring_to_use > desc->tunnel_exclusive_size)
{
/* need to apply shared credict */
- nr_shared_credict = nr_ring_to_use - desc->tunnel_exclusive_size;
- uint32_t cur_value = rte_atomic32_read(desc->shared_credict_counter);
+ assert(nr_ring_to_use >= (desc->tunnel_exclusive_size + desc->shared_credict_used));
+ nr_shared_credict = nr_ring_to_use - (desc->tunnel_exclusive_size + desc->shared_credict_used);
while (1)
{
+ uint32_t cur_value = rte_atomic32_read(desc->shared_credict_counter);
uint32_t new_value = cur_value > nr_shared_credict ? cur_value - nr_shared_credict : 0;
+
if (rte_atomic32_cmpset((volatile uint32_t *)desc->shared_credict_counter, cur_value, new_value))
{
nr_shared_credict = cur_value > new_value ? cur_value - new_value : 0;
@@ -49,9 +51,12 @@ static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons
}
desc->shared_credict_used += nr_shared_credict;
+ assert(desc->shared_credict_used <= (desc->tunnel_size - desc->tunnel_exclusive_size));
}
- unsigned int n_can_send = desc->tunnel_exclusive_size + nr_shared_credict - nr_ring_count;
+ assert((desc->tunnel_exclusive_size + desc->shared_credict_used) >= nr_ring_count);
+
+ unsigned int n_can_send = (desc->tunnel_exclusive_size + desc->shared_credict_used) - nr_ring_count;
unsigned int n_to_send = RTE_MIN(desc->sz_en_buffer_used, n_can_send);
size_t n_send_len = 0;
@@ -75,21 +80,29 @@ static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons
}
unsigned int n_send = rte_ring_sp_enqueue_burst(desc->tunnel_object, (void **)desc->en_buffer, n_to_send, NULL);
+ unsigned int n_send_missed = desc->sz_en_buffer_used - n_send;
- /* 没有丢包 */
- if (likely(n_send == desc->sz_en_buffer_used))
+ /* packet is missed */
+ if (unlikely(n_send_missed != 0))
{
- goto out;
- }
+ for (unsigned int k = n_send; k < desc->sz_en_buffer_used; k++)
+ {
+ struct rte_mbuf * object_to_be_free = desc->en_buffer[k];
+ n_send_len -= rte_pktmbuf_data_len(object_to_be_free);
+ rte_pktmbuf_free(object_to_be_free);
+ }
- for (unsigned int k = n_send; k < desc->sz_en_buffer_used; k++)
- {
- struct rte_mbuf * object_to_be_free = desc->en_buffer[k];
- n_send_len -= rte_pktmbuf_data_len(object_to_be_free);
- rte_pktmbuf_free(object_to_be_free);
+ /* return the shared credict */
+ unsigned int nr_shared_credict_to_release = RTE_MIN(n_send_missed, nr_shared_credict);
+ if (nr_shared_credict_to_release > 0)
+ {
+ rte_atomic32_add(desc->shared_credict_counter, (int32_t)nr_shared_credict_to_release);
+ desc->shared_credict_used -= nr_shared_credict_to_release;
+ }
+
+ assert(desc->shared_credict_used <= (desc->tunnel_size - desc->tunnel_exclusive_size));
}
-out:
cons_notify_ctx = &cons->notify[consq];
if (cons_notify_ctx->enable)
{
@@ -101,21 +114,27 @@ out:
}
// 更新生产者统计计数
- VNODE_STAT_UPDATE(prod, prodq, on_line, n_to_send);
+ VNODE_STAT_UPDATE(prod, prodq, on_line, desc->sz_en_buffer_used);
VNODE_STAT_UPDATE(prod, prodq, deliver, n_send);
- VNODE_STAT_UPDATE(prod, prodq, missed, n_to_send - n_send);
+ VNODE_STAT_UPDATE(prod, prodq, missed, n_send_missed);
VNODE_STAT_UPDATE(prod, prodq, total_len, n_send_len);
// 更新消费者统计计数
- VNODE_STAT_UPDATE(cons, consq, on_line, n_to_send);
+ VNODE_STAT_UPDATE(cons, consq, on_line, desc->sz_en_buffer_used);
VNODE_STAT_UPDATE(cons, consq, deliver, n_send);
- VNODE_STAT_UPDATE(cons, consq, missed, n_to_send - n_send);
+ VNODE_STAT_UPDATE(cons, consq, missed, n_send_missed);
VNODE_STAT_UPDATE(cons, consq, total_len, n_send_len);
// Batch size
- VNODE_STAT_UPDATE(prod, prodq, batch_size_total, n_to_send);
+#if 0
+ VNODE_STAT_UPDATE(prod, prodq, batch_size_total, desc->sz_en_buffer_used);
VNODE_STAT_UPDATE(prod, prodq, batch_size_count, 1);
+ // Ring -- Prod
+ VNODE_STAT_UPDATE_AVG(prod, prodq, ring_elem_count_avg, nr_ring_count);
+ VNODE_STAT_UPDATE_AVG(prod, prodq, ring_shared_credict_avg, nr_shared_credict);
+#endif
+
// 清空缓冲区
desc->sz_en_buffer_used = 0;
}
@@ -127,6 +146,10 @@ static inline void dist_tunnel_enqueue(struct vnode_prod * prod, struct vnode_co
assert(rte_spinlock_trylock(&desc->lock_thread_safe_check));
#endif
+ desc->en_buffer[desc->sz_en_buffer_used++] = obj;
+ assert(desc->sz_en_buffer_used <= desc->sz_en_buffer);
+
+#if 0
// append the object at the tail of enqueue buffer.
unsigned int pos;
pos = desc->sz_en_buffer_used;
@@ -142,14 +165,15 @@ static inline void dist_tunnel_enqueue(struct vnode_prod * prod, struct vnode_co
}
desc->sz_en_buffer_used = desc->sz_en_buffer;
+#endif
#if 0
dist_tunnel_flush(prod, cons, prodq, consq, desc);
#endif
-out:
-
#if VNODE_CHECK_THREAD_SAFE
+
+out:
rte_spinlock_unlock(&desc->lock_thread_safe_check);
#endif
@@ -159,14 +183,15 @@ out:
static inline int dist_tunnel_dequeue(struct tunnel_desc * desc, void * obj, int nr_max_obj)
{
unsigned int nr_deq = rte_ring_sc_dequeue_burst(desc->tunnel_object, obj, nr_max_obj, NULL);
-
unsigned int shared_credict_to_release = RTE_MIN(nr_deq, desc->shared_credict_used);
+
if (shared_credict_to_release > 0)
{
- rte_atomic32_add(desc->shared_credict_counter, (int32_t)desc->shared_credict_used);
+ rte_atomic32_add(desc->shared_credict_counter, (int32_t)shared_credict_to_release);
desc->shared_credict_used -= shared_credict_to_release;
}
+ assert(desc->shared_credict_used <= (desc->tunnel_size - desc->tunnel_exclusive_size));
return (int)nr_deq;
}
@@ -176,29 +201,22 @@ static inline int dist_tunnel_dequeue(struct tunnel_desc * desc, void * obj, int
static inline void dist_tunnel_block_flush(struct tunnel_block * block, int prodq)
{
- struct tunnel_desc * tunnel;
- unsigned int consq;
-
- for (consq = 0; consq < block->nr_consq; consq++)
+ for (unsigned int consq = 0; consq < block->nr_consq; consq++)
{
- tunnel = *tunnel_block_locate(block, prodq, consq);
+ struct tunnel_desc * tunnel = *tunnel_block_locate(block, prodq, consq);
dist_tunnel_flush(block->prod, block->cons, prodq, consq, tunnel);
}
-
- return;
}
static inline void dist_tunnel_block_enqueue_with_hash(struct tunnel_block * block, int prodq, struct rte_mbuf * obj[],
uint32_t hash[], int nr_obj)
{
assert(nr_obj <= MR_LIBVNODE_MAX_SZ_BURST);
- struct tunnel_desc * tunnel;
-
for (unsigned int i = 0; i < nr_obj; i++)
{
assert(obj[i] != NULL);
unsigned int consq = hash[i] % block->nr_consq;
- tunnel = *tunnel_block_locate(block, prodq, consq);
+ struct tunnel_desc * tunnel = *tunnel_block_locate(block, prodq, consq);
dist_tunnel_enqueue(block->prod, block->cons, prodq, consq, tunnel, obj[i]);
}
@@ -318,7 +336,7 @@ int vnode_mirror_enqueue_bulk(struct vnode_prod * prod, unsigned int prodq, stru
{
assert(nr_objects <= MR_LIBVNODE_MAX_SZ_BURST);
- struct tunnel_block * block = prod->block;
+ struct tunnel_block * block = ACCESS_ONCE(prod->block);
if (unlikely(block == NULL))
{
goto failure;
@@ -341,7 +359,7 @@ failure:
int vnode_mirror_dequeue_burst(struct vnode_cons * cons, unsigned int consq, struct rte_mbuf * objects[],
int nr_max_objects)
{
- struct tunnel_block * block = cons->block;
+ struct tunnel_block * block = ACCESS_ONCE(cons->block);
if (likely(block != NULL))
{
return dist_tunnel_block_dequeue(block, consq, objects, nr_max_objects);
@@ -376,7 +394,7 @@ struct vnode * vnode_mirror_create(const char * sym, unsigned int sz_exclusive,
vnode_common->batch_interval_tsc = batch_interval_us * rte_get_timer_cycles() / US_PER_S;
vnode_common->sz_shared = sz_shared;
- rte_atomic32_set(&vnode_common->shared_credict_counter, (int32_t)sz_shared);
+ rte_atomic32_init(&vnode_common->shared_credict_counter);
return vnode_common;
}
diff --git a/infra/test/TestVNode.cc b/infra/test/TestVNode.cc
index a72bb78..24752d7 100644
--- a/infra/test/TestVNode.cc
+++ b/infra/test/TestVNode.cc
@@ -261,6 +261,29 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueueUseSharedCredict)
test_thread_3.join();
test_thread_4.join();
+ unsigned int prod_on_line_total = 0;
+ unsigned int prod_deliver_total = 0;
+ unsigned int prod_missed_total = 0;
+
+ /* verify the stats */
+ struct vnode_prod_stat * prod_stat = vnode_mirror_prod_stat_get(prod);
+ for (unsigned int i = 0; i < 4; i++)
+ {
+ prod_on_line_total += prod_stat[i].on_line;
+ prod_deliver_total += prod_stat[i].deliver;
+ prod_missed_total += prod_stat[i].missed;
+ }
+
+ EXPECT_EQ(prod_on_line_total, 2048);
+ EXPECT_EQ(prod_deliver_total, 2048);
+ EXPECT_EQ(prod_missed_total, 0);
+
+ /* on cons side */
+ struct vnode_cons_stat * cons_stat = vnode_mirror_cons_stat_get(cons);
+ EXPECT_EQ(cons_stat[0].on_line, 2048);
+ EXPECT_EQ(cons_stat[0].deliver, 2048);
+ EXPECT_EQ(cons_stat[0].missed, 0);
+
int deq_ret = vnode_mirror_dequeue_burst(cons, 0, enq_objs, RTE_DIM(enq_objs));
EXPECT_EQ(deq_ret, 2048);
@@ -308,6 +331,16 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict)
int deq_ret_1 = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs));
EXPECT_EQ(deq_ret_1, 544);
+ struct vnode_prod_stat * prod_stat = vnode_mirror_prod_stat_get(prod);
+ EXPECT_EQ(prod_stat->on_line, 1024);
+ EXPECT_EQ(prod_stat->deliver, 544);
+ EXPECT_EQ(prod_stat->missed, 480);
+
+ struct vnode_cons_stat * cons_stat = vnode_mirror_cons_stat_get(cons);
+ EXPECT_EQ(cons_stat->on_line, 1024);
+ EXPECT_EQ(cons_stat->deliver, 544);
+ EXPECT_EQ(cons_stat->missed, 480);
+
/* free these mbufs */
rte_pktmbuf_free_bulk(deq_objs, deq_ret_1);
@@ -321,6 +354,15 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict)
int deq_ret_2 = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs));
EXPECT_EQ(deq_ret_2, 544);
+ /* another round, the stat should be double */
+ EXPECT_EQ(prod_stat->on_line, 2048);
+ EXPECT_EQ(prod_stat->deliver, 1088);
+ EXPECT_EQ(prod_stat->missed, 960);
+
+ EXPECT_EQ(cons_stat->on_line, 2048);
+ EXPECT_EQ(cons_stat->deliver, 1088);
+ EXPECT_EQ(cons_stat->missed, 960);
+
rte_pktmbuf_free_bulk(deq_objs, deq_ret_2);
vnode_mirror_delete(vnode_ptr);
}
diff --git a/service/src/vdata.c b/service/src/vdata.c
index e81558f..a565fb4 100644
--- a/service/src/vdata.c
+++ b/service/src/vdata.c
@@ -93,7 +93,7 @@ static int vdev_data_stats_get(struct _vdev * _vdev, struct vdev_stat_info * sta
stat_info->rx_on_line[i] = VNODE_STAT_READ(&st_prod_rx[i].on_line);
stat_info->rx_deliver[i] = VNODE_STAT_READ(&st_prod_rx[i].deliver);
stat_info->rx_missed[i] = VNODE_STAT_READ(&st_prod_rx[i].missed);
- stat_info->rx_total_len[i] = VNODE_STAT_READ(&st_prod_rx[i].on_line);
+ stat_info->rx_total_len[i] = VNODE_STAT_READ(&st_prod_rx[i].total_len);
#if 0
stat_info->notify_state_running[i] = rte_atomic64_read(&st_prod_rx[i].notify_state_running);