diff options
| author | Lu Qiuwen <[email protected]> | 2023-09-01 10:19:04 +0800 |
|---|---|---|
| committer | Lu Qiuwen <[email protected]> | 2023-09-01 15:06:41 +0800 |
| commit | 42a34334ebe495254b43a4e532bb3d9292e2ed42 (patch) | |
| tree | cc599d7dd58cd3bd4a5174839a2088797d5340c4 | |
| parent | 5e5afdd30f9274038a3ed6130a8f163cd4f4391b (diff) | |
调整shared_counter的计数方式,修正shared_counter成为负值的问题。v4.6.49-20230901
| -rw-r--r-- | infra/include/common.h | 1 | ||||
| -rw-r--r-- | infra/include/vnode.h | 4 | ||||
| -rw-r--r-- | infra/src/vnode_common.c | 3 | ||||
| -rw-r--r-- | infra/src/vnode_common.h | 6 | ||||
| -rw-r--r-- | infra/src/vnode_mirror.c | 90 | ||||
| -rw-r--r-- | infra/test/TestVNode.cc | 42 | ||||
| -rw-r--r-- | service/src/vdata.c | 2 |
7 files changed, 109 insertions, 39 deletions
diff --git a/infra/include/common.h b/infra/include/common.h index ef39b99..f88b0b8 100644 --- a/infra/include/common.h +++ b/infra/include/common.h @@ -248,6 +248,7 @@ static inline int parser_uint(const char * str) // TODO: 支持更多比特位的掩码 typedef uint64_t mask_t; +#define ACCESS_ONCE(x) (*(volatile typeof(x) *)&(x)) /* ================================= MASK ======================================== */ static unsigned int inline mask_popcnt(mask_t mask) diff --git a/infra/include/vnode.h b/infra/include/vnode.h index 0593b22..e7d69ba 100644 --- a/infra/include/vnode.h +++ b/infra/include/vnode.h @@ -97,8 +97,8 @@ struct vnode_prod_stat volatile uint64_t missed; volatile uint64_t total_len; - volatile uint64_t cloned; - volatile uint64_t cloned_fail; + volatile int64_t ring_elem_count_avg; + volatile int64_t ring_shared_credict_avg; /* notify */ volatile uint64_t notify_state_waiting; diff --git a/infra/src/vnode_common.c b/infra/src/vnode_common.c index 1f077aa..db6e9c8 100644 --- a/infra/src/vnode_common.c +++ b/infra/src/vnode_common.c @@ -221,6 +221,9 @@ static int do_consumer_join_unsafe(struct vnode * vnode, struct vnode_cons * con } vnode->cons = cons; + + /* reset the shared credict counter */ + rte_atomic32_set(&vnode->shared_credict_counter, (int32_t)vnode->sz_shared); return 0; error: diff --git a/infra/src/vnode_common.h b/infra/src/vnode_common.h index c089bf8..ce1998d 100644 --- a/infra/src/vnode_common.h +++ b/infra/src/vnode_common.h @@ -157,6 +157,12 @@ do { \ do {} while(0) #endif +#define VNODE_STAT_UPDATE_AVG(desc, queue, item, value) \ +do { \ + desc->stat[queue].item += (typeof(desc->stat[queue].item))(0.2 * (value - desc->stat[queue].item)); \ +} while(0) + + static inline struct tunnel_desc ** tunnel_block_locate(struct tunnel_block * block, unsigned int prodq_id, unsigned int consq_id) { diff --git a/infra/src/vnode_mirror.c b/infra/src/vnode_mirror.c index e878bbd..d613618 100644 --- a/infra/src/vnode_mirror.c +++ b/infra/src/vnode_mirror.c @@ -28,19 +28,21 @@ static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons return; } - unsigned int nr_ring_count = rte_ring_count(desc->tunnel_object); + unsigned int nr_ring_count = desc->shared_credict_counter != NULL ? rte_ring_count(desc->tunnel_object) : 0; unsigned int nr_ring_to_use = nr_ring_count + desc->sz_en_buffer_used; unsigned int nr_shared_credict = 0; if (nr_ring_to_use > desc->tunnel_exclusive_size) { /* need to apply shared credict */ - nr_shared_credict = nr_ring_to_use - desc->tunnel_exclusive_size; - uint32_t cur_value = rte_atomic32_read(desc->shared_credict_counter); + assert(nr_ring_to_use >= (desc->tunnel_exclusive_size + desc->shared_credict_used)); + nr_shared_credict = nr_ring_to_use - (desc->tunnel_exclusive_size + desc->shared_credict_used); while (1) { + uint32_t cur_value = rte_atomic32_read(desc->shared_credict_counter); uint32_t new_value = cur_value > nr_shared_credict ? cur_value - nr_shared_credict : 0; + if (rte_atomic32_cmpset((volatile uint32_t *)desc->shared_credict_counter, cur_value, new_value)) { nr_shared_credict = cur_value > new_value ? cur_value - new_value : 0; @@ -49,9 +51,12 @@ static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons } desc->shared_credict_used += nr_shared_credict; + assert(desc->shared_credict_used <= (desc->tunnel_size - desc->tunnel_exclusive_size)); } - unsigned int n_can_send = desc->tunnel_exclusive_size + nr_shared_credict - nr_ring_count; + assert((desc->tunnel_exclusive_size + desc->shared_credict_used) >= nr_ring_count); + + unsigned int n_can_send = (desc->tunnel_exclusive_size + desc->shared_credict_used) - nr_ring_count; unsigned int n_to_send = RTE_MIN(desc->sz_en_buffer_used, n_can_send); size_t n_send_len = 0; @@ -75,21 +80,29 @@ static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons } unsigned int n_send = rte_ring_sp_enqueue_burst(desc->tunnel_object, (void **)desc->en_buffer, n_to_send, NULL); + unsigned int n_send_missed = desc->sz_en_buffer_used - n_send; - /* 没有丢包 */ - if (likely(n_send == desc->sz_en_buffer_used)) + /* packet is missed */ + if (unlikely(n_send_missed != 0)) { - goto out; - } + for (unsigned int k = n_send; k < desc->sz_en_buffer_used; k++) + { + struct rte_mbuf * object_to_be_free = desc->en_buffer[k]; + n_send_len -= rte_pktmbuf_data_len(object_to_be_free); + rte_pktmbuf_free(object_to_be_free); + } - for (unsigned int k = n_send; k < desc->sz_en_buffer_used; k++) - { - struct rte_mbuf * object_to_be_free = desc->en_buffer[k]; - n_send_len -= rte_pktmbuf_data_len(object_to_be_free); - rte_pktmbuf_free(object_to_be_free); + /* return the shared credict */ + unsigned int nr_shared_credict_to_release = RTE_MIN(n_send_missed, nr_shared_credict); + if (nr_shared_credict_to_release > 0) + { + rte_atomic32_add(desc->shared_credict_counter, (int32_t)nr_shared_credict_to_release); + desc->shared_credict_used -= nr_shared_credict_to_release; + } + + assert(desc->shared_credict_used <= (desc->tunnel_size - desc->tunnel_exclusive_size)); } -out: cons_notify_ctx = &cons->notify[consq]; if (cons_notify_ctx->enable) { @@ -101,21 +114,27 @@ out: } // 更新生产者统计计数 - VNODE_STAT_UPDATE(prod, prodq, on_line, n_to_send); + VNODE_STAT_UPDATE(prod, prodq, on_line, desc->sz_en_buffer_used); VNODE_STAT_UPDATE(prod, prodq, deliver, n_send); - VNODE_STAT_UPDATE(prod, prodq, missed, n_to_send - n_send); + VNODE_STAT_UPDATE(prod, prodq, missed, n_send_missed); VNODE_STAT_UPDATE(prod, prodq, total_len, n_send_len); // 更新消费者统计计数 - VNODE_STAT_UPDATE(cons, consq, on_line, n_to_send); + VNODE_STAT_UPDATE(cons, consq, on_line, desc->sz_en_buffer_used); VNODE_STAT_UPDATE(cons, consq, deliver, n_send); - VNODE_STAT_UPDATE(cons, consq, missed, n_to_send - n_send); + VNODE_STAT_UPDATE(cons, consq, missed, n_send_missed); VNODE_STAT_UPDATE(cons, consq, total_len, n_send_len); // Batch size - VNODE_STAT_UPDATE(prod, prodq, batch_size_total, n_to_send); +#if 0 + VNODE_STAT_UPDATE(prod, prodq, batch_size_total, desc->sz_en_buffer_used); VNODE_STAT_UPDATE(prod, prodq, batch_size_count, 1); + // Ring -- Prod + VNODE_STAT_UPDATE_AVG(prod, prodq, ring_elem_count_avg, nr_ring_count); + VNODE_STAT_UPDATE_AVG(prod, prodq, ring_shared_credict_avg, nr_shared_credict); +#endif + // 清空缓冲区 desc->sz_en_buffer_used = 0; } @@ -127,6 +146,10 @@ static inline void dist_tunnel_enqueue(struct vnode_prod * prod, struct vnode_co assert(rte_spinlock_trylock(&desc->lock_thread_safe_check)); #endif + desc->en_buffer[desc->sz_en_buffer_used++] = obj; + assert(desc->sz_en_buffer_used <= desc->sz_en_buffer); + +#if 0 // append the object at the tail of enqueue buffer. unsigned int pos; pos = desc->sz_en_buffer_used; @@ -142,14 +165,15 @@ static inline void dist_tunnel_enqueue(struct vnode_prod * prod, struct vnode_co } desc->sz_en_buffer_used = desc->sz_en_buffer; +#endif #if 0 dist_tunnel_flush(prod, cons, prodq, consq, desc); #endif -out: - #if VNODE_CHECK_THREAD_SAFE + +out: rte_spinlock_unlock(&desc->lock_thread_safe_check); #endif @@ -159,14 +183,15 @@ out: static inline int dist_tunnel_dequeue(struct tunnel_desc * desc, void * obj, int nr_max_obj) { unsigned int nr_deq = rte_ring_sc_dequeue_burst(desc->tunnel_object, obj, nr_max_obj, NULL); - unsigned int shared_credict_to_release = RTE_MIN(nr_deq, desc->shared_credict_used); + if (shared_credict_to_release > 0) { - rte_atomic32_add(desc->shared_credict_counter, (int32_t)desc->shared_credict_used); + rte_atomic32_add(desc->shared_credict_counter, (int32_t)shared_credict_to_release); desc->shared_credict_used -= shared_credict_to_release; } + assert(desc->shared_credict_used <= (desc->tunnel_size - desc->tunnel_exclusive_size)); return (int)nr_deq; } @@ -176,29 +201,22 @@ static inline int dist_tunnel_dequeue(struct tunnel_desc * desc, void * obj, int static inline void dist_tunnel_block_flush(struct tunnel_block * block, int prodq) { - struct tunnel_desc * tunnel; - unsigned int consq; - - for (consq = 0; consq < block->nr_consq; consq++) + for (unsigned int consq = 0; consq < block->nr_consq; consq++) { - tunnel = *tunnel_block_locate(block, prodq, consq); + struct tunnel_desc * tunnel = *tunnel_block_locate(block, prodq, consq); dist_tunnel_flush(block->prod, block->cons, prodq, consq, tunnel); } - - return; } static inline void dist_tunnel_block_enqueue_with_hash(struct tunnel_block * block, int prodq, struct rte_mbuf * obj[], uint32_t hash[], int nr_obj) { assert(nr_obj <= MR_LIBVNODE_MAX_SZ_BURST); - struct tunnel_desc * tunnel; - for (unsigned int i = 0; i < nr_obj; i++) { assert(obj[i] != NULL); unsigned int consq = hash[i] % block->nr_consq; - tunnel = *tunnel_block_locate(block, prodq, consq); + struct tunnel_desc * tunnel = *tunnel_block_locate(block, prodq, consq); dist_tunnel_enqueue(block->prod, block->cons, prodq, consq, tunnel, obj[i]); } @@ -318,7 +336,7 @@ int vnode_mirror_enqueue_bulk(struct vnode_prod * prod, unsigned int prodq, stru { assert(nr_objects <= MR_LIBVNODE_MAX_SZ_BURST); - struct tunnel_block * block = prod->block; + struct tunnel_block * block = ACCESS_ONCE(prod->block); if (unlikely(block == NULL)) { goto failure; @@ -341,7 +359,7 @@ failure: int vnode_mirror_dequeue_burst(struct vnode_cons * cons, unsigned int consq, struct rte_mbuf * objects[], int nr_max_objects) { - struct tunnel_block * block = cons->block; + struct tunnel_block * block = ACCESS_ONCE(cons->block); if (likely(block != NULL)) { return dist_tunnel_block_dequeue(block, consq, objects, nr_max_objects); @@ -376,7 +394,7 @@ struct vnode * vnode_mirror_create(const char * sym, unsigned int sz_exclusive, vnode_common->batch_interval_tsc = batch_interval_us * rte_get_timer_cycles() / US_PER_S; vnode_common->sz_shared = sz_shared; - rte_atomic32_set(&vnode_common->shared_credict_counter, (int32_t)sz_shared); + rte_atomic32_init(&vnode_common->shared_credict_counter); return vnode_common; } diff --git a/infra/test/TestVNode.cc b/infra/test/TestVNode.cc index a72bb78..24752d7 100644 --- a/infra/test/TestVNode.cc +++ b/infra/test/TestVNode.cc @@ -261,6 +261,29 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueueUseSharedCredict) test_thread_3.join(); test_thread_4.join(); + unsigned int prod_on_line_total = 0; + unsigned int prod_deliver_total = 0; + unsigned int prod_missed_total = 0; + + /* verify the stats */ + struct vnode_prod_stat * prod_stat = vnode_mirror_prod_stat_get(prod); + for (unsigned int i = 0; i < 4; i++) + { + prod_on_line_total += prod_stat[i].on_line; + prod_deliver_total += prod_stat[i].deliver; + prod_missed_total += prod_stat[i].missed; + } + + EXPECT_EQ(prod_on_line_total, 2048); + EXPECT_EQ(prod_deliver_total, 2048); + EXPECT_EQ(prod_missed_total, 0); + + /* on cons side */ + struct vnode_cons_stat * cons_stat = vnode_mirror_cons_stat_get(cons); + EXPECT_EQ(cons_stat[0].on_line, 2048); + EXPECT_EQ(cons_stat[0].deliver, 2048); + EXPECT_EQ(cons_stat[0].missed, 0); + int deq_ret = vnode_mirror_dequeue_burst(cons, 0, enq_objs, RTE_DIM(enq_objs)); EXPECT_EQ(deq_ret, 2048); @@ -308,6 +331,16 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict) int deq_ret_1 = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs)); EXPECT_EQ(deq_ret_1, 544); + struct vnode_prod_stat * prod_stat = vnode_mirror_prod_stat_get(prod); + EXPECT_EQ(prod_stat->on_line, 1024); + EXPECT_EQ(prod_stat->deliver, 544); + EXPECT_EQ(prod_stat->missed, 480); + + struct vnode_cons_stat * cons_stat = vnode_mirror_cons_stat_get(cons); + EXPECT_EQ(cons_stat->on_line, 1024); + EXPECT_EQ(cons_stat->deliver, 544); + EXPECT_EQ(cons_stat->missed, 480); + /* free these mbufs */ rte_pktmbuf_free_bulk(deq_objs, deq_ret_1); @@ -321,6 +354,15 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict) int deq_ret_2 = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs)); EXPECT_EQ(deq_ret_2, 544); + /* another round, the stat should be double */ + EXPECT_EQ(prod_stat->on_line, 2048); + EXPECT_EQ(prod_stat->deliver, 1088); + EXPECT_EQ(prod_stat->missed, 960); + + EXPECT_EQ(cons_stat->on_line, 2048); + EXPECT_EQ(cons_stat->deliver, 1088); + EXPECT_EQ(cons_stat->missed, 960); + rte_pktmbuf_free_bulk(deq_objs, deq_ret_2); vnode_mirror_delete(vnode_ptr); } diff --git a/service/src/vdata.c b/service/src/vdata.c index e81558f..a565fb4 100644 --- a/service/src/vdata.c +++ b/service/src/vdata.c @@ -93,7 +93,7 @@ static int vdev_data_stats_get(struct _vdev * _vdev, struct vdev_stat_info * sta stat_info->rx_on_line[i] = VNODE_STAT_READ(&st_prod_rx[i].on_line); stat_info->rx_deliver[i] = VNODE_STAT_READ(&st_prod_rx[i].deliver); stat_info->rx_missed[i] = VNODE_STAT_READ(&st_prod_rx[i].missed); - stat_info->rx_total_len[i] = VNODE_STAT_READ(&st_prod_rx[i].on_line); + stat_info->rx_total_len[i] = VNODE_STAT_READ(&st_prod_rx[i].total_len); #if 0 stat_info->notify_state_running[i] = rte_atomic64_read(&st_prod_rx[i].notify_state_running); |
