summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bbq/include/bbq.h30
-rw-r--r--bbq/src/bbq.c42
-rw-r--r--bbq/tests/common/test_queue.c4
-rw-r--r--bbq/tests/unittest/ut_head_cursor.cc28
-rw-r--r--bbq/tests/unittest/ut_mix.cc28
5 files changed, 83 insertions, 49 deletions
diff --git a/bbq/include/bbq.h b/bbq/include/bbq.h
index d752130..3f0c355 100644
--- a/bbq/include/bbq.h
+++ b/bbq/include/bbq.h
@@ -1,6 +1,6 @@
/*
* @Author: [email protected]
- * @LastEditTime: 2024-07-01 09:18:56
+ * @LastEditTime: 2024-07-01 23:12:49
* @Describe: bbq(Block-based Bounded Queue)头文件
* 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf
*/
@@ -25,7 +25,8 @@ using aotmic_uint64 = std::atomic<uint64_t>;
#define BBQ_SOCKET_ID_ANY -1
#define BBQ_SYMBOL_MAX 64
-#define __BBQ_CACHE_ALIGNED __attribute__((__aligned__(64)))
+#define BBQ_CACHE_LINE 64
+#define __BBQ_CACHE_ALIGNED __attribute__((__aligned__(BBQ_CACHE_LINE)))
struct bbq_atomic64 {
bool single; // 如果为单生产者或单消费者,则single为true
@@ -33,22 +34,31 @@ struct bbq_atomic64 {
volatile uint64_t s; // single使用该字段
aotmic_uint64 m;
};
+};
+
+struct bbq_head {
+ struct bbq_atomic64 value; // head值
+ struct bbq_atomic64 count; // 出/入队个数统计(create时设置了统计flag才生效)
} __BBQ_CACHE_ALIGNED;
struct bbq_block {
+ // cache line 1
struct bbq_atomic64 committed; // 已提交(version|offset)
struct bbq_atomic64 allocated; // 已分配(version|offset)
struct bbq_atomic64 reserved; // 已预留(version|offset)
struct bbq_atomic64 consumed; // 已消费(version|offset)注:在drop-old模式下没用到
- char *entries; // 存储大小可变的entry,每个块分配空间:bs * entry_size
+ // cache line 2
+ char *entries; // 存储大小可变的entry,每个块分配空间:bs * entry_size
} __BBQ_CACHE_ALIGNED;
typedef void *(*bbq_malloc_f)(int32_t socket_id, size_t size);
typedef void (*bbq_free_f)(void *ptr, size_t size);
struct bbq {
+ // cache-line 1
char name[BBQ_SYMBOL_MAX] __BBQ_CACHE_ALIGNED;
+ // cache-line 2
int32_t socket_id; // 用于libnuma分配内存,socket_id小于0将使用malloc分配
uint32_t bn; // blocks的个数
uint32_t bs; // blocks.entries的个数
@@ -61,21 +71,19 @@ struct bbq {
bbq_malloc_f malloc_f; // 申请内存的函数,默认为malloc
bbq_free_f free_f; // 申请内存的函数,默认为free
- struct bbq_atomic64 phead; // 生产者头,指向块的索引,分为两部分:version|idx
- struct bbq_atomic64 chead; // 消费者头,指向块的索引,分为两部分:version|idx
+ // cache-line 3
+ struct bbq_head phead; // 生产者头,指向块的索引,分为两部分:version|idx
+ // cache-line 4
+ struct bbq_head chead; // 消费者头,指向块的索引,分为两部分:version|idx
- struct {
- struct bbq_atomic64 n_enq;
- struct bbq_atomic64 n_deq;
- } stat;
+ // cache-line 5
struct bbq_block *blocks; // bn大小的数组
-
#ifdef TEST_PERF_MEM
struct {
char *ptr; // 内存池起始地址
size_t off; // 已使用的偏移大小
size_t size; // 内存池总大小
- } memory_pool;
+ } memory_pool; // 仅在初始化和调试时会读写
#endif
} __BBQ_CACHE_ALIGNED;
diff --git a/bbq/src/bbq.c b/bbq/src/bbq.c
index 90bcdbe..b56c6cc 100644
--- a/bbq/src/bbq.c
+++ b/bbq/src/bbq.c
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-07-01 05:20:02
+ * @LastEditTime: 2024-07-01 23:13:37
* @Describe: bbq(Block-based Bounded Queue)实现
* 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf
@@ -124,7 +124,7 @@ void *bbq_malloc_def_callback(int32_t socket_id __attribute__((unused)), size_t
bbq_memory_g.malloc_cnt++;
bbq_memory_g.malloc_size += size;
#endif
- return malloc(size);
+ return aligned_alloc(BBQ_CACHE_LINE, size);
}
void bbq_free_def_callback(void *ptr,
@@ -361,10 +361,10 @@ static struct bbq *__bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs,
q->entry_size = obj_size;
q->socket_id = socket_id;
if (BBQ_F_CHK_SP_ENQ(flags)) {
- q->phead.single = true;
+ q->phead.value.single = true;
}
if (BBQ_F_CHK_SC_DEQ(flags)) {
- q->chead.single = true;
+ q->chead.value.single = true;
}
q->flags = flags;
q->malloc_f = malloc_f;
@@ -607,13 +607,13 @@ enum bbq_queue_state advance_phead(struct bbq *q, uint64_t ph) {
bbq_fetch_max(&n_blk->allocated, new_vsn);
// ph+1,当超过索引范围,进入下一轮时,version会自动+1
- bbq_fetch_max(&q->phead, ph + 1);
+ bbq_fetch_max(&q->phead.value, ph + 1);
return BBQ_SUCCESS;
}
-static uint32_t bbq_wait_consumed_get_by_stat(struct bbq *q, uint64_t enq_update, uint64_t deq_update) {
- uint64_t enq_now = enq_update == 0 ? bbq_atomic64_load(&q->stat.n_enq) : enq_update;
- uint64_t deq_now = deq_update == 0 ? bbq_atomic64_load(&q->stat.n_deq) : deq_update;
+static uint32_t bbq_wait_consumed_get(struct bbq *q, uint64_t enq_update, uint64_t deq_update) {
+ uint64_t enq_now = enq_update == 0 ? bbq_atomic64_load(&q->phead.count) : enq_update;
+ uint64_t deq_now = deq_update == 0 ? bbq_atomic64_load(&q->chead.count) : deq_update;
return enq_now - deq_now;
}
@@ -627,13 +627,13 @@ __attribute__((unused)) static uint32_t bbq_wait_consumed_get_by_head(struct bbq
if (ch_ptr != NULL) {
ch = *ch_ptr;
} else {
- ch = bbq_atomic64_load(&q->chead);
+ ch = bbq_atomic64_load(&q->chead.value);
}
if (ph_ptr != NULL) {
ph = *ph_ptr;
} else {
- ph = bbq_atomic64_load(&q->phead);
+ ph = bbq_atomic64_load(&q->phead.value);
}
uint64_t ph_idx = bbq_head_idx(q, ph);
@@ -679,7 +679,7 @@ static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t
}
while (true) {
- uint64_t ph = bbq_atomic64_load(&q->phead);
+ uint64_t ph = bbq_atomic64_load(&q->phead.value);
struct bbq_queue_state_s state = bbq_allocate_entry(q, ph, n);
switch (state.state) {
@@ -689,7 +689,7 @@ static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t
ret.status = BBQ_OK;
if (BBQ_F_CHK_STAT_ENABLE(q->flags)) {
- enq_update = bbq_atomic64_fetch_add(&q->stat.n_enq, state.e.actual_burst) + state.e.actual_burst;
+ enq_update = bbq_atomic64_fetch_add(&q->phead.count, state.e.actual_burst) + state.e.actual_burst;
}
break;
case BBQ_BLOCK_DONE: {
@@ -718,7 +718,7 @@ static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t
}
if (BBQ_F_CHK_STAT_ENABLE(q->flags) && wait_consumed != NULL) {
- *wait_consumed = bbq_wait_consumed_get_by_stat(q, enq_update, 0);
+ *wait_consumed = bbq_wait_consumed_get(q, enq_update, 0);
}
return ret;
@@ -891,7 +891,7 @@ bool advance_chead(struct bbq *q, uint64_t ch, uint64_t ver) {
bbq_fetch_max(&n_blk->reserved, new_vsn);
}
- bbq_fetch_max(&q->chead, ch + 1);
+ bbq_fetch_max(&q->chead.value, ch + 1);
return true;
}
@@ -906,7 +906,7 @@ static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n
}
while (true) {
- uint64_t ch = bbq_atomic64_load(&q->chead);
+ uint64_t ch = bbq_atomic64_load(&q->chead.value);
struct bbq_block *blk = &(q->blocks[bbq_head_idx(q, ch)]);
struct bbq_queue_state_s state;
state = bbq_reserve_entry(q, blk, n);
@@ -920,7 +920,7 @@ static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n
ret.actual_burst = state.e.actual_burst;
if (BBQ_F_CHK_STAT_ENABLE(q->flags)) {
- deq_update = bbq_atomic64_fetch_add(&q->stat.n_deq, state.e.actual_burst) + state.e.actual_burst;
+ deq_update = bbq_atomic64_fetch_add(&q->chead.count, state.e.actual_burst) + state.e.actual_burst;
}
break;
case BBQ_NO_ENTRY:
@@ -945,7 +945,7 @@ static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n
}
if (BBQ_F_CHK_STAT_ENABLE(q->flags) && wait_consumed != NULL) {
- *wait_consumed = bbq_wait_consumed_get_by_stat(q, 0, deq_update);
+ *wait_consumed = bbq_wait_consumed_get(q, 0, deq_update);
}
return ret;
@@ -1082,8 +1082,8 @@ static uint32_t bbq_enqueue_burst_two_dimensional(struct bbq *q, void *const *ob
}
bool bbq_empty(struct bbq *q) {
- uint64_t phead = bbq_atomic64_load(&q->phead);
- uint64_t chead = bbq_atomic64_load(&q->chead);
+ uint64_t phead = bbq_atomic64_load(&q->phead.value);
+ uint64_t chead = bbq_atomic64_load(&q->chead.value);
uint64_t ph_vsn = bbq_head_vsn(q, phead);
uint64_t ch_vsn = bbq_head_vsn(q, chead);
@@ -1209,8 +1209,8 @@ void bbq_debug_block_print(struct bbq *q, struct bbq_block *block) {
void bbq_debug_struct_print(struct bbq *q) {
printf("-----bbq:%s-----\n", BBQ_F_CHK_DROP_OLD(q->flags) ? "drop old" : "retry new");
- uint64_t phead = bbq_atomic64_load(&q->phead);
- uint64_t chead = bbq_atomic64_load(&q->chead);
+ uint64_t phead = bbq_atomic64_load(&q->phead.value);
+ uint64_t chead = bbq_atomic64_load(&q->chead.value);
printf("block number:%u block size:%u total entries:%u\n", q->bn, q->bs, q->bn * q->bs);
printf("producer header idx:%lu vsn:%lu\n", bbq_head_idx(q, phead), bbq_head_vsn(q, phead));
diff --git a/bbq/tests/common/test_queue.c b/bbq/tests/common/test_queue.c
index e72f1ee..045e63c 100644
--- a/bbq/tests/common/test_queue.c
+++ b/bbq/tests/common/test_queue.c
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-07-01 03:56:01
+ * @LastEditTime: 2024-07-01 05:23:47
* @Describe: TODO
*/
@@ -24,7 +24,7 @@ uint32_t test_bbq_enqueue_burst(void *ring, void **obj_table, uint32_t n, uint16
}
int test_queue_init_bbq(test_cfg *cfg, test_queue_s *q) {
-#if 1
+#if 0
// 开启了BBQ_F_ENABLE_STAT 会导致性能下降
unsigned int flags = BBQ_F_RETRY_NEW | BBQ_F_ENABLE_STAT;
#else
diff --git a/bbq/tests/unittest/ut_head_cursor.cc b/bbq/tests/unittest/ut_head_cursor.cc
index bbd2296..3c25d9d 100644
--- a/bbq/tests/unittest/ut_head_cursor.cc
+++ b/bbq/tests/unittest/ut_head_cursor.cc
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-07-01 03:57:33
+ * @LastEditTime: 2024-07-01 22:34:42
* @Describe: TODO
*/
@@ -33,13 +33,13 @@ class bbq_head_cursor : public testing::Test { // 继承了 testing::Test
};
void expect_phead(struct bbq *q, uint64_t idx, uint64_t vsn, int line) {
- uint64_t ph = bbq_atomic64_load(&q->phead);
+ uint64_t ph = bbq_atomic64_load(&q->phead.value);
EXPECT_EQ(bbq_head_idx(q, ph), idx) << "line: " << line;
EXPECT_EQ(bbq_head_vsn(q, ph), vsn) << "line: " << line;
}
void expect_chead(struct bbq *q, uint64_t idx, uint64_t vsn, int line) {
- uint64_t ch = bbq_atomic64_load(&q->chead);
+ uint64_t ch = bbq_atomic64_load(&q->chead.value);
EXPECT_EQ(bbq_head_idx(q, ch), idx) << "line: " << line;
EXPECT_EQ(bbq_head_vsn(q, ch), vsn) << "line: " << line;
}
@@ -78,8 +78,8 @@ TEST_F(bbq_head_cursor, init) {
// 1.初始化状态,除了第一个block外其他块的4个游标都指向最后一个条目
- EXPECT_EQ(bbq_atomic64_load(&q->phead), 0);
- EXPECT_EQ(bbq_atomic64_load(&q->chead), 0);
+ EXPECT_EQ(bbq_atomic64_load(&q->phead.value), 0);
+ EXPECT_EQ(bbq_atomic64_load(&q->chead.value), 0);
expect_eq_allocated(q, &q->blocks[0], 0, 0, __LINE__);
expect_eq_committed(q, &q->blocks[0], 0, 0, __LINE__);
@@ -115,8 +115,8 @@ void ut_produce_something(uint32_t produce_cnt) {
EXPECT_TRUE(ret == BBQ_OK);
}
- EXPECT_EQ(bbq_atomic64_load(&q->phead), 0);
- EXPECT_EQ(bbq_atomic64_load(&q->chead), 0);
+ EXPECT_EQ(bbq_atomic64_load(&q->phead.value), 0);
+ EXPECT_EQ(bbq_atomic64_load(&q->chead.value), 0);
expect_eq_allocated(q, &q->blocks[0], produce_cnt, 0, __LINE__);
expect_eq_committed(q, &q->blocks[0], produce_cnt, 0, __LINE__);
expect_eq_reserved(q, &q->blocks[0], 0, 0, __LINE__);
@@ -129,8 +129,8 @@ void ut_produce_something(uint32_t produce_cnt) {
EXPECT_EQ(dequeue_data, TEST_DATA_MAGIC);
}
- EXPECT_EQ(bbq_atomic64_load(&q->phead), 0);
- EXPECT_EQ(bbq_atomic64_load(&q->chead), 0);
+ EXPECT_EQ(bbq_atomic64_load(&q->phead.value), 0);
+ EXPECT_EQ(bbq_atomic64_load(&q->chead.value), 0);
expect_eq_allocated(q, &q->blocks[0], produce_cnt, 0, __LINE__);
expect_eq_committed(q, &q->blocks[0], produce_cnt, 0, __LINE__);
expect_eq_reserved(q, &q->blocks[0], produce_cnt, 0, __LINE__);
@@ -174,7 +174,7 @@ void ut_produce_next_block(uint32_t over) {
EXPECT_TRUE(ret == BBQ_OK);
}
- EXPECT_EQ(bbq_atomic64_load(&q->chead), 0);
+ EXPECT_EQ(bbq_atomic64_load(&q->chead.value), 0);
expect_phead(q, 1, 0, __LINE__);
expect_eq_allocated(q, &q->blocks[0], bs, 0, __LINE__);
expect_eq_committed(q, &q->blocks[0], bs, 0, __LINE__);
@@ -301,8 +301,8 @@ TEST_F(bbq_head_cursor, retry_new_full_empty) {
EXPECT_TRUE(ret == BBQ_ERR_FULL);
}
- ph = bbq_atomic64_load(&q->phead);
- ch = bbq_atomic64_load(&q->chead);
+ ph = bbq_atomic64_load(&q->phead.value);
+ ch = bbq_atomic64_load(&q->chead.value);
if (i == 0) {
EXPECT_EQ((ph + 1) & q->idx_mask, ch & q->idx_mask);
} else {
@@ -329,8 +329,8 @@ TEST_F(bbq_head_cursor, retry_new_full_empty) {
EXPECT_TRUE(ret == BBQ_ERR_EMPTY);
}
- ph = bbq_atomic64_load(&q->phead);
- ch = bbq_atomic64_load(&q->chead);
+ ph = bbq_atomic64_load(&q->phead.value);
+ ch = bbq_atomic64_load(&q->chead.value);
EXPECT_EQ(ph & q->idx_mask, ch & q->idx_mask);
for (uint32_t i = 0; i < q->bn; i++) {
EXPECT_EQ(bbq_atomic64_load(&q->blocks[i].committed) & q->off_mask, q->bs);
diff --git a/bbq/tests/unittest/ut_mix.cc b/bbq/tests/unittest/ut_mix.cc
index cd545bf..81b362f 100644
--- a/bbq/tests/unittest/ut_mix.cc
+++ b/bbq/tests/unittest/ut_mix.cc
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-27 07:25:43
+ * @LastEditTime: 2024-07-01 23:09:56
* @Describe: bbq除了队列操作外,其他函数的测试
*/
@@ -148,3 +148,29 @@ TEST_F(bbq_mix, bbq_block_number_calc) {
}
}
}
+
+#define OFFSETOF(type, member) ((size_t) & ((type *)0)->member)
+#define PRINT_OFFSETOF(type, member) printf("Offset of '%s' in '%s' is %zu\n", #member, #type, OFFSETOF(type, member))
+#define PTR_ALIGNED_64(ptr) (((uintptr_t)ptr & (64 - 1)) == 0)
+
+TEST_F(bbq_mix, bbq_cache_line) {
+
+ // 创建队列
+ struct bbq *q = bbq_create("test_bbq", 4096, BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW, NULL, NULL);
+ printf("q:%p\n", q);
+ ASSERT_NE(q, nullptr);
+
+ // 首地址64字节对齐
+ ASSERT_EQ(PTR_ALIGNED_64(q), true);
+
+ // 关键成员64字节对齐
+ ASSERT_EQ(PTR_ALIGNED_64(&q->name), true);
+ ASSERT_EQ(PTR_ALIGNED_64(&q->socket_id), true);
+ ASSERT_EQ(PTR_ALIGNED_64(&q->phead), true);
+ ASSERT_EQ(PTR_ALIGNED_64(&q->chead), true);
+ ASSERT_EQ(PTR_ALIGNED_64(&q->blocks), true);
+ ASSERT_EQ(PTR_ALIGNED_64(&q->blocks[0].committed), true);
+ ASSERT_EQ(PTR_ALIGNED_64(&q->blocks[0].entries), true);
+
+ bbq_destory(q);
+} \ No newline at end of file