diff options
Diffstat (limited to 'bbq')
| -rw-r--r-- | bbq/include/bbq.h | 30 | ||||
| -rw-r--r-- | bbq/src/bbq.c | 42 | ||||
| -rw-r--r-- | bbq/tests/common/test_queue.c | 4 | ||||
| -rw-r--r-- | bbq/tests/unittest/ut_head_cursor.cc | 28 | ||||
| -rw-r--r-- | bbq/tests/unittest/ut_mix.cc | 28 |
5 files changed, 83 insertions, 49 deletions
diff --git a/bbq/include/bbq.h b/bbq/include/bbq.h index d752130..3f0c355 100644 --- a/bbq/include/bbq.h +++ b/bbq/include/bbq.h @@ -1,6 +1,6 @@ /* * @Author: [email protected] - * @LastEditTime: 2024-07-01 09:18:56 + * @LastEditTime: 2024-07-01 23:12:49 * @Describe: bbq(Block-based Bounded Queue)头文件 * 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf */ @@ -25,7 +25,8 @@ using aotmic_uint64 = std::atomic<uint64_t>; #define BBQ_SOCKET_ID_ANY -1 #define BBQ_SYMBOL_MAX 64 -#define __BBQ_CACHE_ALIGNED __attribute__((__aligned__(64))) +#define BBQ_CACHE_LINE 64 +#define __BBQ_CACHE_ALIGNED __attribute__((__aligned__(BBQ_CACHE_LINE))) struct bbq_atomic64 { bool single; // 如果为单生产者或单消费者,则single为true @@ -33,22 +34,31 @@ struct bbq_atomic64 { volatile uint64_t s; // single使用该字段 aotmic_uint64 m; }; +}; + +struct bbq_head { + struct bbq_atomic64 value; // head值 + struct bbq_atomic64 count; // 出/入队个数统计(create时设置了统计flag才生效) } __BBQ_CACHE_ALIGNED; struct bbq_block { + // cache line 1 struct bbq_atomic64 committed; // 已提交(version|offset) struct bbq_atomic64 allocated; // 已分配(version|offset) struct bbq_atomic64 reserved; // 已预留(version|offset) struct bbq_atomic64 consumed; // 已消费(version|offset)注:在drop-old模式下没用到 - char *entries; // 存储大小可变的entry,每个块分配空间:bs * entry_size + // cache line 2 + char *entries; // 存储大小可变的entry,每个块分配空间:bs * entry_size } __BBQ_CACHE_ALIGNED; typedef void *(*bbq_malloc_f)(int32_t socket_id, size_t size); typedef void (*bbq_free_f)(void *ptr, size_t size); struct bbq { + // cache-line 1 char name[BBQ_SYMBOL_MAX] __BBQ_CACHE_ALIGNED; + // cache-line 2 int32_t socket_id; // 用于libnuma分配内存,socket_id小于0将使用malloc分配 uint32_t bn; // blocks的个数 uint32_t bs; // blocks.entries的个数 @@ -61,21 +71,19 @@ struct bbq { bbq_malloc_f malloc_f; // 申请内存的函数,默认为malloc bbq_free_f free_f; // 申请内存的函数,默认为free - struct bbq_atomic64 phead; // 生产者头,指向块的索引,分为两部分:version|idx - struct bbq_atomic64 chead; // 消费者头,指向块的索引,分为两部分:version|idx + // cache-line 3 + struct bbq_head phead; // 生产者头,指向块的索引,分为两部分:version|idx + // cache-line 4 + struct bbq_head chead; // 消费者头,指向块的索引,分为两部分:version|idx - struct { - struct bbq_atomic64 n_enq; - struct bbq_atomic64 n_deq; - } stat; + // cache-line 5 struct bbq_block *blocks; // bn大小的数组 - #ifdef TEST_PERF_MEM struct { char *ptr; // 内存池起始地址 size_t off; // 已使用的偏移大小 size_t size; // 内存池总大小 - } memory_pool; + } memory_pool; // 仅在初始化和调试时会读写 #endif } __BBQ_CACHE_ALIGNED; diff --git a/bbq/src/bbq.c b/bbq/src/bbq.c index 90bcdbe..b56c6cc 100644 --- a/bbq/src/bbq.c +++ b/bbq/src/bbq.c @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-07-01 05:20:02 + * @LastEditTime: 2024-07-01 23:13:37 * @Email: [email protected] * @Describe: bbq(Block-based Bounded Queue)实现 * 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf @@ -124,7 +124,7 @@ void *bbq_malloc_def_callback(int32_t socket_id __attribute__((unused)), size_t bbq_memory_g.malloc_cnt++; bbq_memory_g.malloc_size += size; #endif - return malloc(size); + return aligned_alloc(BBQ_CACHE_LINE, size); } void bbq_free_def_callback(void *ptr, @@ -361,10 +361,10 @@ static struct bbq *__bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs, q->entry_size = obj_size; q->socket_id = socket_id; if (BBQ_F_CHK_SP_ENQ(flags)) { - q->phead.single = true; + q->phead.value.single = true; } if (BBQ_F_CHK_SC_DEQ(flags)) { - q->chead.single = true; + q->chead.value.single = true; } q->flags = flags; q->malloc_f = malloc_f; @@ -607,13 +607,13 @@ enum bbq_queue_state advance_phead(struct bbq *q, uint64_t ph) { bbq_fetch_max(&n_blk->allocated, new_vsn); // ph+1,当超过索引范围,进入下一轮时,version会自动+1 - bbq_fetch_max(&q->phead, ph + 1); + bbq_fetch_max(&q->phead.value, ph + 1); return BBQ_SUCCESS; } -static uint32_t bbq_wait_consumed_get_by_stat(struct bbq *q, uint64_t enq_update, uint64_t deq_update) { - uint64_t enq_now = enq_update == 0 ? bbq_atomic64_load(&q->stat.n_enq) : enq_update; - uint64_t deq_now = deq_update == 0 ? bbq_atomic64_load(&q->stat.n_deq) : deq_update; +static uint32_t bbq_wait_consumed_get(struct bbq *q, uint64_t enq_update, uint64_t deq_update) { + uint64_t enq_now = enq_update == 0 ? bbq_atomic64_load(&q->phead.count) : enq_update; + uint64_t deq_now = deq_update == 0 ? bbq_atomic64_load(&q->chead.count) : deq_update; return enq_now - deq_now; } @@ -627,13 +627,13 @@ __attribute__((unused)) static uint32_t bbq_wait_consumed_get_by_head(struct bbq if (ch_ptr != NULL) { ch = *ch_ptr; } else { - ch = bbq_atomic64_load(&q->chead); + ch = bbq_atomic64_load(&q->chead.value); } if (ph_ptr != NULL) { ph = *ph_ptr; } else { - ph = bbq_atomic64_load(&q->phead); + ph = bbq_atomic64_load(&q->phead.value); } uint64_t ph_idx = bbq_head_idx(q, ph); @@ -679,7 +679,7 @@ static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t } while (true) { - uint64_t ph = bbq_atomic64_load(&q->phead); + uint64_t ph = bbq_atomic64_load(&q->phead.value); struct bbq_queue_state_s state = bbq_allocate_entry(q, ph, n); switch (state.state) { @@ -689,7 +689,7 @@ static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t ret.status = BBQ_OK; if (BBQ_F_CHK_STAT_ENABLE(q->flags)) { - enq_update = bbq_atomic64_fetch_add(&q->stat.n_enq, state.e.actual_burst) + state.e.actual_burst; + enq_update = bbq_atomic64_fetch_add(&q->phead.count, state.e.actual_burst) + state.e.actual_burst; } break; case BBQ_BLOCK_DONE: { @@ -718,7 +718,7 @@ static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t } if (BBQ_F_CHK_STAT_ENABLE(q->flags) && wait_consumed != NULL) { - *wait_consumed = bbq_wait_consumed_get_by_stat(q, enq_update, 0); + *wait_consumed = bbq_wait_consumed_get(q, enq_update, 0); } return ret; @@ -891,7 +891,7 @@ bool advance_chead(struct bbq *q, uint64_t ch, uint64_t ver) { bbq_fetch_max(&n_blk->reserved, new_vsn); } - bbq_fetch_max(&q->chead, ch + 1); + bbq_fetch_max(&q->chead.value, ch + 1); return true; } @@ -906,7 +906,7 @@ static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n } while (true) { - uint64_t ch = bbq_atomic64_load(&q->chead); + uint64_t ch = bbq_atomic64_load(&q->chead.value); struct bbq_block *blk = &(q->blocks[bbq_head_idx(q, ch)]); struct bbq_queue_state_s state; state = bbq_reserve_entry(q, blk, n); @@ -920,7 +920,7 @@ static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n ret.actual_burst = state.e.actual_burst; if (BBQ_F_CHK_STAT_ENABLE(q->flags)) { - deq_update = bbq_atomic64_fetch_add(&q->stat.n_deq, state.e.actual_burst) + state.e.actual_burst; + deq_update = bbq_atomic64_fetch_add(&q->chead.count, state.e.actual_burst) + state.e.actual_burst; } break; case BBQ_NO_ENTRY: @@ -945,7 +945,7 @@ static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n } if (BBQ_F_CHK_STAT_ENABLE(q->flags) && wait_consumed != NULL) { - *wait_consumed = bbq_wait_consumed_get_by_stat(q, 0, deq_update); + *wait_consumed = bbq_wait_consumed_get(q, 0, deq_update); } return ret; @@ -1082,8 +1082,8 @@ static uint32_t bbq_enqueue_burst_two_dimensional(struct bbq *q, void *const *ob } bool bbq_empty(struct bbq *q) { - uint64_t phead = bbq_atomic64_load(&q->phead); - uint64_t chead = bbq_atomic64_load(&q->chead); + uint64_t phead = bbq_atomic64_load(&q->phead.value); + uint64_t chead = bbq_atomic64_load(&q->chead.value); uint64_t ph_vsn = bbq_head_vsn(q, phead); uint64_t ch_vsn = bbq_head_vsn(q, chead); @@ -1209,8 +1209,8 @@ void bbq_debug_block_print(struct bbq *q, struct bbq_block *block) { void bbq_debug_struct_print(struct bbq *q) { printf("-----bbq:%s-----\n", BBQ_F_CHK_DROP_OLD(q->flags) ? "drop old" : "retry new"); - uint64_t phead = bbq_atomic64_load(&q->phead); - uint64_t chead = bbq_atomic64_load(&q->chead); + uint64_t phead = bbq_atomic64_load(&q->phead.value); + uint64_t chead = bbq_atomic64_load(&q->chead.value); printf("block number:%u block size:%u total entries:%u\n", q->bn, q->bs, q->bn * q->bs); printf("producer header idx:%lu vsn:%lu\n", bbq_head_idx(q, phead), bbq_head_vsn(q, phead)); diff --git a/bbq/tests/common/test_queue.c b/bbq/tests/common/test_queue.c index e72f1ee..045e63c 100644 --- a/bbq/tests/common/test_queue.c +++ b/bbq/tests/common/test_queue.c @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-07-01 03:56:01 + * @LastEditTime: 2024-07-01 05:23:47 * @Email: [email protected] * @Describe: TODO */ @@ -24,7 +24,7 @@ uint32_t test_bbq_enqueue_burst(void *ring, void **obj_table, uint32_t n, uint16 } int test_queue_init_bbq(test_cfg *cfg, test_queue_s *q) { -#if 1 +#if 0 // 开启了BBQ_F_ENABLE_STAT 会导致性能下降 unsigned int flags = BBQ_F_RETRY_NEW | BBQ_F_ENABLE_STAT; #else diff --git a/bbq/tests/unittest/ut_head_cursor.cc b/bbq/tests/unittest/ut_head_cursor.cc index bbd2296..3c25d9d 100644 --- a/bbq/tests/unittest/ut_head_cursor.cc +++ b/bbq/tests/unittest/ut_head_cursor.cc @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-07-01 03:57:33 + * @LastEditTime: 2024-07-01 22:34:42 * @Email: [email protected] * @Describe: TODO */ @@ -33,13 +33,13 @@ class bbq_head_cursor : public testing::Test { // 继承了 testing::Test }; void expect_phead(struct bbq *q, uint64_t idx, uint64_t vsn, int line) { - uint64_t ph = bbq_atomic64_load(&q->phead); + uint64_t ph = bbq_atomic64_load(&q->phead.value); EXPECT_EQ(bbq_head_idx(q, ph), idx) << "line: " << line; EXPECT_EQ(bbq_head_vsn(q, ph), vsn) << "line: " << line; } void expect_chead(struct bbq *q, uint64_t idx, uint64_t vsn, int line) { - uint64_t ch = bbq_atomic64_load(&q->chead); + uint64_t ch = bbq_atomic64_load(&q->chead.value); EXPECT_EQ(bbq_head_idx(q, ch), idx) << "line: " << line; EXPECT_EQ(bbq_head_vsn(q, ch), vsn) << "line: " << line; } @@ -78,8 +78,8 @@ TEST_F(bbq_head_cursor, init) { // 1.初始化状态,除了第一个block外其他块的4个游标都指向最后一个条目 - EXPECT_EQ(bbq_atomic64_load(&q->phead), 0); - EXPECT_EQ(bbq_atomic64_load(&q->chead), 0); + EXPECT_EQ(bbq_atomic64_load(&q->phead.value), 0); + EXPECT_EQ(bbq_atomic64_load(&q->chead.value), 0); expect_eq_allocated(q, &q->blocks[0], 0, 0, __LINE__); expect_eq_committed(q, &q->blocks[0], 0, 0, __LINE__); @@ -115,8 +115,8 @@ void ut_produce_something(uint32_t produce_cnt) { EXPECT_TRUE(ret == BBQ_OK); } - EXPECT_EQ(bbq_atomic64_load(&q->phead), 0); - EXPECT_EQ(bbq_atomic64_load(&q->chead), 0); + EXPECT_EQ(bbq_atomic64_load(&q->phead.value), 0); + EXPECT_EQ(bbq_atomic64_load(&q->chead.value), 0); expect_eq_allocated(q, &q->blocks[0], produce_cnt, 0, __LINE__); expect_eq_committed(q, &q->blocks[0], produce_cnt, 0, __LINE__); expect_eq_reserved(q, &q->blocks[0], 0, 0, __LINE__); @@ -129,8 +129,8 @@ void ut_produce_something(uint32_t produce_cnt) { EXPECT_EQ(dequeue_data, TEST_DATA_MAGIC); } - EXPECT_EQ(bbq_atomic64_load(&q->phead), 0); - EXPECT_EQ(bbq_atomic64_load(&q->chead), 0); + EXPECT_EQ(bbq_atomic64_load(&q->phead.value), 0); + EXPECT_EQ(bbq_atomic64_load(&q->chead.value), 0); expect_eq_allocated(q, &q->blocks[0], produce_cnt, 0, __LINE__); expect_eq_committed(q, &q->blocks[0], produce_cnt, 0, __LINE__); expect_eq_reserved(q, &q->blocks[0], produce_cnt, 0, __LINE__); @@ -174,7 +174,7 @@ void ut_produce_next_block(uint32_t over) { EXPECT_TRUE(ret == BBQ_OK); } - EXPECT_EQ(bbq_atomic64_load(&q->chead), 0); + EXPECT_EQ(bbq_atomic64_load(&q->chead.value), 0); expect_phead(q, 1, 0, __LINE__); expect_eq_allocated(q, &q->blocks[0], bs, 0, __LINE__); expect_eq_committed(q, &q->blocks[0], bs, 0, __LINE__); @@ -301,8 +301,8 @@ TEST_F(bbq_head_cursor, retry_new_full_empty) { EXPECT_TRUE(ret == BBQ_ERR_FULL); } - ph = bbq_atomic64_load(&q->phead); - ch = bbq_atomic64_load(&q->chead); + ph = bbq_atomic64_load(&q->phead.value); + ch = bbq_atomic64_load(&q->chead.value); if (i == 0) { EXPECT_EQ((ph + 1) & q->idx_mask, ch & q->idx_mask); } else { @@ -329,8 +329,8 @@ TEST_F(bbq_head_cursor, retry_new_full_empty) { EXPECT_TRUE(ret == BBQ_ERR_EMPTY); } - ph = bbq_atomic64_load(&q->phead); - ch = bbq_atomic64_load(&q->chead); + ph = bbq_atomic64_load(&q->phead.value); + ch = bbq_atomic64_load(&q->chead.value); EXPECT_EQ(ph & q->idx_mask, ch & q->idx_mask); for (uint32_t i = 0; i < q->bn; i++) { EXPECT_EQ(bbq_atomic64_load(&q->blocks[i].committed) & q->off_mask, q->bs); diff --git a/bbq/tests/unittest/ut_mix.cc b/bbq/tests/unittest/ut_mix.cc index cd545bf..81b362f 100644 --- a/bbq/tests/unittest/ut_mix.cc +++ b/bbq/tests/unittest/ut_mix.cc @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-27 07:25:43 + * @LastEditTime: 2024-07-01 23:09:56 * @Email: [email protected] * @Describe: bbq除了队列操作外,其他函数的测试 */ @@ -148,3 +148,29 @@ TEST_F(bbq_mix, bbq_block_number_calc) { } } } + +#define OFFSETOF(type, member) ((size_t) & ((type *)0)->member) +#define PRINT_OFFSETOF(type, member) printf("Offset of '%s' in '%s' is %zu\n", #member, #type, OFFSETOF(type, member)) +#define PTR_ALIGNED_64(ptr) (((uintptr_t)ptr & (64 - 1)) == 0) + +TEST_F(bbq_mix, bbq_cache_line) { + + // 创建队列 + struct bbq *q = bbq_create("test_bbq", 4096, BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW, NULL, NULL); + printf("q:%p\n", q); + ASSERT_NE(q, nullptr); + + // 首地址64字节对齐 + ASSERT_EQ(PTR_ALIGNED_64(q), true); + + // 关键成员64字节对齐 + ASSERT_EQ(PTR_ALIGNED_64(&q->name), true); + ASSERT_EQ(PTR_ALIGNED_64(&q->socket_id), true); + ASSERT_EQ(PTR_ALIGNED_64(&q->phead), true); + ASSERT_EQ(PTR_ALIGNED_64(&q->chead), true); + ASSERT_EQ(PTR_ALIGNED_64(&q->blocks), true); + ASSERT_EQ(PTR_ALIGNED_64(&q->blocks[0].committed), true); + ASSERT_EQ(PTR_ALIGNED_64(&q->blocks[0].entries), true); + + bbq_destory(q); +}
\ No newline at end of file |
