summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorliuyu <[email protected]>2024-06-27 07:34:04 -0400
committerliuyu <[email protected]>2024-06-27 07:34:04 -0400
commitcc654f704fe3caf3c6bc9b385ec26f23c098f5de (patch)
tree461b3cf8440a734f88b21687dff6d4d28901d9a1
parent7f309a3257c04abbf20e5467d081da96413e4d21 (diff)
添加scsp模式
-rw-r--r--.vscode/settings.json4
-rw-r--r--bbq/include/bbq.h53
-rw-r--r--bbq/src/bbq.c182
-rw-r--r--bbq/tests/common/test_queue.c11
-rw-r--r--bbq/tests/unittest/ut_example.cc2
-rw-r--r--bbq/tests/unittest/ut_head_cursor.cc80
-rw-r--r--bbq/tests/unittest/ut_mix.cc20
-rw-r--r--bbq/tests/unittest/ut_multit.cc2
-rw-r--r--perf/benchmark/bcm_benchmark.c6
-rw-r--r--perf/benchmark/bcm_queue.c13
10 files changed, 235 insertions, 138 deletions
diff --git a/.vscode/settings.json b/.vscode/settings.json
index 2aff784..a379dda 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -92,6 +92,8 @@
"test_mix.h": "c",
"test_queue.h": "c",
"prctl.h": "c",
- "types.h": "c"
+ "types.h": "c",
+ "chrono": "c",
+ "fstream": "c"
}
} \ No newline at end of file
diff --git a/bbq/include/bbq.h b/bbq/include/bbq.h
index 3b8269d..7f421f8 100644
--- a/bbq/include/bbq.h
+++ b/bbq/include/bbq.h
@@ -1,6 +1,6 @@
/*
* @Author: [email protected]
- * @LastEditTime: 2024-06-27 03:04:19
+ * @LastEditTime: 2024-06-27 06:20:30
* @Describe: bbq(Block-based Bounded Queue)头文件
* 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf
*/
@@ -15,41 +15,45 @@
// C
#include <stdatomic.h>
typedef atomic_uint_fast64_t aotmic_uint64;
-typedef aotmic_uint64 bbq_cursor;
-typedef aotmic_uint64 bbq_head;
#else
// C++ 为了兼容gtest测试
-using bbq_cursor = std::atomic<uint64_t>;
-using bbq_head = std::atomic<uint64_t>;
using aotmic_uint64 = std::atomic<uint64_t>;
#endif
#define BBQ_SOCKET_ID_ANY -1
-#define __BBQ_CACHE_ALIGNED __attribute__((__aligned__(64)))
#define BBQ_SYMBOL_MAX 64
+#define __BBQ_CACHE_ALIGNED __attribute__((__aligned__(64)))
+
+struct bbq_atomic64 {
+ bool single; // 如果为单生产者或单消费者,则single为true
+ union {
+ volatile uint64_t s; // single使用该字段
+ aotmic_uint64 m;
+ };
+} __BBQ_CACHE_ALIGNED;
struct bbq_block {
- bbq_cursor committed; // 已提交(version|offset)
- bbq_cursor allocated; // 已分配(version|offset)
- bbq_cursor reserved; // 已预留(version|offset)
- bbq_cursor consumed; // 已消费(version|offset)注:在drop-old模式下没用到
- char *entries; // 存储大小可变的entry,每个块分配空间:bs * entry_size
+ struct bbq_atomic64 committed; // 已提交(version|offset)
+ struct bbq_atomic64 allocated; // 已分配(version|offset)
+ struct bbq_atomic64 reserved; // 已预留(version|offset)
+ struct bbq_atomic64 consumed; // 已消费(version|offset)注:在drop-old模式下没用到
+ char *entries; // 存储大小可变的entry,每个块分配空间:bs * entry_size
} __BBQ_CACHE_ALIGNED;
struct bbq {
char name[BBQ_SYMBOL_MAX] __BBQ_CACHE_ALIGNED;
- int32_t socket_id; // 用于libnuma分配内存,socket_id小于0将使用malloc分配
- uint32_t bn; // blocks的个数
- uint32_t bs; // blocks.entries的个数
- uint32_t flags; // 标记:retry new 模式,还是drop old模式
- uint32_t idx_bits; // bbq_head里idx所占的位数
- uint32_t off_bits; // bbq_cursor里offset所占的位数
- uint64_t idx_mask; // idx_bits偏移后的掩码
- uint64_t off_mask; // off_bits偏移后的掩码
- uint64_t entry_size; // blocks.entries里每个entry的大小
- bbq_head phead; // 生产者头,指向块的索引,分为两部分:version|idx
- bbq_head chead; // 消费者头,指向块的索引,分为两部分:version|idx
+ int32_t socket_id; // 用于libnuma分配内存,socket_id小于0将使用malloc分配
+ uint32_t bn; // blocks的个数
+ uint32_t bs; // blocks.entries的个数
+ uint32_t flags; // 标记:retry new 模式,还是drop old模式
+ uint32_t idx_bits; // bbq_head里idx所占的位数
+ uint32_t off_bits; // bbq_cursor里offset所占的位数
+ uint64_t idx_mask; // idx_bits偏移后的掩码
+ uint64_t off_mask; // off_bits偏移后的掩码
+ uint64_t entry_size; // blocks.entries里每个entry的大小
+ struct bbq_atomic64 phead; // 生产者头,指向块的索引,分为两部分:version|idx
+ struct bbq_atomic64 chead; // 消费者头,指向块的索引,分为两部分:version|idx
struct bbq_block *blocks; // bn大小的数组
} __BBQ_CACHE_ALIGNED;
@@ -58,6 +62,11 @@ struct bbq {
#define BBQ_F_DROP_OLD 0x0002 /**< 创建队列时设置为drop old模式(队列满时,入队成功并覆盖旧数据) */
#define BBQ_F_RETRY_NEW BBQ_F_DEFAULT /**< 创建队列时设置为retry new模式(队列满时,当前入队失败) */
+
+#define BBQ_F_SP_ENQ 0x0004
+#define BBQ_F_MP_ENQ BBQ_F_DEFAULT
+#define BBQ_F_SC_DEQ 0x0008
+#define BBQ_F_MC_DEQ BBQ_F_DEFAULT
/**
* 创建bbq队列,使用当前函数创建的队列,后续操作会把指针入队。
* 对应入队函数:bbq_enqueue、bbq_enqueue_burst
diff --git a/bbq/src/bbq.c b/bbq/src/bbq.c
index 5e63c51..bf7760f 100644
--- a/bbq/src/bbq.c
+++ b/bbq/src/bbq.c
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-26 04:59:28
+ * @LastEditTime: 2024-06-27 07:28:24
* @Describe: bbq(Block-based Bounded Queue)实现
* 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf
@@ -18,6 +18,8 @@
// 判断flags标记位
#define BBQ_F_CHK_DROP_OLD(flags) (flags & BBQ_F_DROP_OLD)
#define BBQ_F_CHK_COPY_VALUE(flags) (flags & BBQ_F_COPY_VALUE)
+#define BBQ_F_CHK_SP_ENQ(flags) (flags & BBQ_F_SP_ENQ)
+#define BBQ_F_CHK_SC_DEQ(flags) (flags & BBQ_F_SC_DEQ)
// 避免无用参数的编译告警
#define AVOID_WARNING(param) ((void)param)
@@ -78,6 +80,31 @@ static inline uint64_t bbq_set_cur_vsn(struct bbq *q, uint64_t ver) {
return ver << q->off_bits;
}
+uint64_t bbq_atomic64_load(struct bbq_atomic64 *atomic) {
+ if (atomic->single) {
+ return atomic->s;
+ } else {
+ return atomic_load(&atomic->m);
+ }
+}
+
+void bbq_atomic64_store(struct bbq_atomic64 *atomic, uint64_t value) {
+ if (atomic->single) {
+ atomic->s = value;
+ } else {
+ atomic_store(&atomic->m, value);
+ }
+}
+static inline uint64_t bbq_atomic64_fetch_add(struct bbq_atomic64 *atomic, uint64_t value) {
+ if (atomic->single) {
+ uint64_t old = atomic->s;
+ atomic->s += value;
+ return old;
+ } else {
+ return atomic_fetch_add(&atomic->m, value);
+ }
+}
+
// 当BBQ_MEMORY宏定义开关打开,将对内存分配释放进行统计,方便排查内存泄漏
enum bbq_module {
BBQ_MODULE_MAIN = 0,
@@ -133,11 +160,17 @@ static void bbq_free(enum bbq_module module, int socket_id, void *ptr, size_t si
}
/* 原子的比较两个值大小,并设置较大的值,成功则返回设置前的旧值 */
-uint64_t bbq_fetch_max(aotmic_uint64 *atom, uint64_t upd) {
+uint64_t bbq_fetch_max(struct bbq_atomic64 *atomic, uint64_t upd) {
uint64_t old_value = 0;
- do {
- old_value = atomic_load(atom);
- } while (old_value < upd && !atomic_compare_exchange_weak(atom, &old_value, upd));
+
+ if (atomic->single) {
+ old_value = atomic->s;
+ atomic->s = upd;
+ } else {
+ do {
+ old_value = atomic_load(&atomic->m);
+ } while (old_value < upd && !atomic_compare_exchange_weak(&atomic->m, &old_value, upd));
+ }
return old_value;
}
@@ -184,16 +217,20 @@ int bbq_bnbs_calc(uint32_t entries, uint32_t *bn, uint32_t *bs) {
}
/* 块初始化 */
-int block_init(struct bbq *q, struct bbq_block *block, bool cursor_init) {
+int block_init(struct bbq *q, struct bbq_block *block, bool cursor_init, uint32_t flags) {
+ size_t size = 0;
#ifdef BBQ_MEMORY
// 末尾多分配一个entry,它永远不应该被修改,以此检查是否存在写越界的问题
- block->entries = bbq_malloc(BBQ_MODULE_BLOCK_ENTRY, q->socket_id,
- (q->bs + 1) * q->entry_size);
+ size = (q->bs + 1) * q->entry_size;
+ block->entries = bbq_malloc(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, size);
char *last_entry = block->entries + q->entry_size * q->bs;
+ memset(block->entries, 0, size);
memset(last_entry, BBQ_MEM_MAGIC, q->entry_size);
#else
+ size = q->bs * q->entry_size;
block->entries = bbq_malloc(BBQ_MODULE_BLOCK_ENTRY, q->socket_id,
q->bs * q->entry_size);
+ memset(block->entries, 0, size);
#endif
if (block->entries == NULL) {
@@ -201,20 +238,23 @@ int block_init(struct bbq *q, struct bbq_block *block, bool cursor_init) {
return bbq_errno;
}
- block->committed = ATOMIC_VAR_INIT(0);
- block->allocated = ATOMIC_VAR_INIT(0);
- block->reserved = ATOMIC_VAR_INIT(0);
- block->consumed = ATOMIC_VAR_INIT(0);
+ if (BBQ_F_CHK_SP_ENQ(flags)) {
+ block->allocated.single = true;
+ block->committed.single = true;
+ }
+
+ if (BBQ_F_CHK_SC_DEQ(flags)) {
+ block->reserved.single = true;
+ block->consumed.single = true;
+ }
if (cursor_init) {
// block数组里,除了第一块之外需要设置
- block->committed = ATOMIC_VAR_INIT(q->bs);
- block->allocated = ATOMIC_VAR_INIT(q->bs);
- block->reserved = ATOMIC_VAR_INIT(q->bs);
- if (BBQ_F_CHK_DROP_OLD(q->flags)) {
- block->consumed = ATOMIC_VAR_INIT(0); // drop old模式下用不到consumed
- } else {
- block->consumed = ATOMIC_VAR_INIT(q->bs);
+ bbq_atomic64_store(&block->committed, q->bs);
+ bbq_atomic64_store(&block->allocated, q->bs);
+ bbq_atomic64_store(&block->reserved, q->bs);
+ if (!BBQ_F_CHK_DROP_OLD(q->flags)) {
+ bbq_atomic64_store(&block->consumed, q->bs);
}
}
@@ -254,7 +294,7 @@ static unsigned bbq_ceil_log2(uint64_t x) {
/* 创建消息队列,bn和bs必须是2的N次幂,socket_id用于多numa分配内存 */
static struct bbq *__bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags) {
int ret = 0;
-
+ size_t size = 0;
if (bbq_check_power_of_two(bn) == false) {
bbq_errno = BBQ_ERR_POWER_OF_TWO;
return NULL;
@@ -291,19 +331,24 @@ static struct bbq *__bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs,
q->bs = bs;
q->entry_size = obj_size;
q->socket_id = socket_id;
- q->phead = ATOMIC_VAR_INIT(0);
- q->chead = ATOMIC_VAR_INIT(0);
+ if (BBQ_F_CHK_SP_ENQ(flags)) {
+ q->phead.single = true;
+ }
+ if (BBQ_F_CHK_SC_DEQ(flags)) {
+ q->chead.single = true;
+ }
q->flags = flags;
- q->blocks = bbq_malloc(BBQ_MODULE_BLOCK_NB, socket_id, bn * sizeof(*q->blocks));
+ size = bn * sizeof(*q->blocks);
+ q->blocks = bbq_malloc(BBQ_MODULE_BLOCK_NB, socket_id, size);
if (q->blocks == NULL) {
bbq_errno = BBQ_ERR_ALLOC;
goto error;
}
- memset(q->blocks, 0, sizeof(*q->blocks));
+ memset(q->blocks, 0, size);
for (uint32_t i = 0; i < bn; ++i) {
- ret = block_init(q, &(q->blocks[i]), (i == 0 ? false : true));
+ ret = block_init(q, &(q->blocks[i]), (i == 0 ? false : true), flags);
if (ret != BBQ_OK) {
goto error;
}
@@ -414,19 +459,19 @@ void commit_entry(struct bbq *q, struct bbq_entry_desc *e, void const *data, uin
break;
}
}
- atomic_fetch_add(&e->block->committed, e->actual_burst);
+ bbq_atomic64_fetch_add(&e->block->committed, e->actual_burst);
}
struct bbq_queue_state_s allocate_entry(struct bbq *q, struct bbq_block *block, uint32_t n) {
struct bbq_queue_state_s state = {0};
- if (bbq_cur_off(q, atomic_load(&block->allocated)) >= q->bs) {
+ if (bbq_cur_off(q, bbq_atomic64_load(&block->allocated)) >= q->bs) {
state.state = BBQ_BLOCK_DONE;
return state;
}
- uint64_t old = atomic_fetch_add(&block->allocated, n);
+ uint64_t old = bbq_atomic64_fetch_add(&block->allocated, n);
// committed_vsn在当前块被初始化后值是不变的,通过比较vsn值,来判断allocated的off是否溢出了,导致vsn+1
- uint64_t committed_vsn = bbq_cur_vsn(q, atomic_load(&block->committed));
+ uint64_t committed_vsn = bbq_cur_vsn(q, bbq_atomic64_load(&block->committed));
uint64_t cur_vsn = bbq_cur_vsn(q, old);
uint64_t cur_off = bbq_cur_off(q, old);
@@ -457,13 +502,13 @@ enum bbq_queue_state advance_phead(struct bbq *q, uint64_t ph) {
uint64_t ph_vsn = bbq_head_vsn(q, ph);
if (BBQ_F_CHK_DROP_OLD(q->flags)) {
- cur = atomic_load(&n_blk->committed);
+ cur = bbq_atomic64_load(&n_blk->committed);
// 生产者head避免覆盖上一轮尚未完全提交的区块
if (bbq_cur_vsn(q, cur) == ph_vsn && bbq_cur_off(q, cur) != q->bs) {
return BBQ_NOT_AVAILABLE;
}
} else {
- cur = atomic_load(&n_blk->consumed);
+ cur = bbq_atomic64_load(&n_blk->consumed);
uint64_t reserved;
uint64_t consumed_off = bbq_cur_off(q, cur);
uint64_t consumed_vsn = bbq_cur_vsn(q, cur);
@@ -471,7 +516,7 @@ enum bbq_queue_state advance_phead(struct bbq *q, uint64_t ph) {
if (consumed_vsn < ph_vsn ||
(consumed_vsn == ph_vsn && consumed_off != q->bs)) {
// 生产者赶上了消费者
- reserved = atomic_load(&n_blk->reserved);
+ reserved = bbq_atomic64_load(&n_blk->reserved);
if (bbq_cur_off(q, reserved) == consumed_off) {
return BBQ_NO_ENTRY;
} else {
@@ -497,21 +542,21 @@ static uint32_t bbq_wait_consumed_set(struct bbq *q, uint64_t *ch_ptr, uint64_t
if (ch_ptr != NULL) {
ch = *ch_ptr;
} else {
- ch = atomic_load(&q->chead);
+ ch = bbq_atomic64_load(&q->chead);
}
if (ph_ptr != NULL) {
ph = *ph_ptr;
} else {
- ph = atomic_load(&q->phead);
+ ph = bbq_atomic64_load(&q->phead);
}
uint64_t ph_idx = bbq_head_idx(q, ph);
uint64_t ch_idx = bbq_head_idx(q, ch);
- uint64_t committed_off = bbq_cur_off(q, atomic_load(&blk_ph->committed));
+ uint64_t committed_off = bbq_cur_off(q, bbq_atomic64_load(&blk_ph->committed));
struct bbq_block *blk_ch = &(q->blocks[bbq_head_idx(q, ch)]);
- uint64_t reserved_off = bbq_cur_off(q, atomic_load(&blk_ch->reserved));
+ uint64_t reserved_off = bbq_cur_off(q, bbq_atomic64_load(&blk_ch->reserved));
// "生产者"超过"消费者"块的个数
uint64_t idx_diff = ph_idx >= ch_idx ? ph_idx - ch_idx : q->bn - ch_idx + ph_idx;
@@ -548,7 +593,7 @@ static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t
}
while (true) {
- uint64_t ph = atomic_load(&q->phead);
+ uint64_t ph = bbq_atomic64_load(&q->phead);
struct bbq_block *blk = &(q->blocks[bbq_head_idx(q, ph)]);
struct bbq_queue_state_s state = allocate_entry(q, blk, n);
@@ -604,30 +649,35 @@ int bbq_enqueue_elem(struct bbq *q, void const *data) {
}
/* 更新成功 reserve成功的个数 */
-uint32_t bbq_reserve_update(bbq_cursor *aotmic, uint64_t reserved, uint32_t n) {
- // TODO:逻辑可以合并
- if (n == 1) {
- // fetch_max返回的是旧值,如果旧值等于局部变量reserved,则代表数据由当前线程更新
- if (bbq_fetch_max(aotmic, reserved + 1) == reserved) {
- return 1;
- }
-
- return 0;
+uint32_t bbq_reserve_update(struct bbq_atomic64 *atomic, uint64_t reserved, uint32_t n) {
+ if (atomic->single) {
+ atomic->s += n;
+ return n;
} else {
- bool ret = atomic_compare_exchange_weak(aotmic, &reserved, reserved + n);
- return ret == true ? n : 0;
+ // TODO:合并逻辑?
+ if (n == 1) {
+ // fetch_max返回的是旧值,如果旧值等于局部变量reserved,则代表数据由当前线程更新
+ if (bbq_fetch_max(atomic, reserved + 1) == reserved) {
+ return 1;
+ }
+
+ return 0;
+ } else {
+ bool ret = atomic_compare_exchange_weak(&atomic->m, &reserved, reserved + n);
+ return ret == true ? n : 0;
+ }
}
}
struct bbq_queue_state_s bbq_reserve_entry(struct bbq *q, struct bbq_block *block, uint32_t n) {
while (true) {
struct bbq_queue_state_s state;
- uint64_t reserved = atomic_load(&block->reserved);
+ uint64_t reserved = bbq_atomic64_load(&block->reserved);
uint64_t reserved_off = bbq_cur_off(q, reserved);
uint64_t reserved_svn = bbq_cur_vsn(q, reserved);
if (reserved_off < q->bs) {
- uint64_t consumed = atomic_load(&block->consumed);
+ uint64_t consumed = bbq_atomic64_load(&block->consumed);
// TODO:bug?? ver溢出了,在drop old模式下使用了vsn,应该传入consumed的vsn合理?
// TODO:这个情况可能出现?
if (!BBQ_F_CHK_DROP_OLD(q->flags) && reserved_svn != bbq_cur_vsn(q, consumed)) {
@@ -637,7 +687,7 @@ struct bbq_queue_state_s bbq_reserve_entry(struct bbq *q, struct bbq_block *bloc
return state;
}
- uint64_t committed = atomic_load(&block->committed);
+ uint64_t committed = bbq_atomic64_load(&block->committed);
uint64_t committed_off = bbq_cur_off(q, committed);
if (committed_off == reserved_off) {
state.state = BBQ_NO_ENTRY;
@@ -646,7 +696,7 @@ struct bbq_queue_state_s bbq_reserve_entry(struct bbq *q, struct bbq_block *bloc
// 当前块的数据没有被全部commited,需要通过判断allocated和committed来判断是否存在正在入队进行中的数据
if (committed_off != q->bs) {
- uint64_t allocated = atomic_load(&block->allocated);
+ uint64_t allocated = bbq_atomic64_load(&block->allocated);
if (bbq_cur_off(q, allocated) != committed_off) {
state.state = BBQ_NOT_AVAILABLE;
return state;
@@ -712,13 +762,13 @@ bool consume_entry(struct bbq *q, struct bbq_entry_desc *e, void *deq_data, uint
uint64_t allocated;
if (BBQ_F_CHK_DROP_OLD(q->flags)) {
// TODO:优化,考虑allocated vsn溢出?考虑判断如果生产满了,直接移动head?
- allocated = atomic_load(&e->block->allocated);
+ allocated = bbq_atomic64_load(&e->block->allocated);
// 预留的entry所在的块,已经被新生产的数据赶上了
if (bbq_cur_vsn(q, allocated) != e->vsn) {
return false;
}
} else {
- atomic_fetch_add(&e->block->consumed, e->actual_burst);
+ bbq_atomic64_fetch_add(&e->block->consumed, e->actual_burst);
}
return true;
@@ -729,7 +779,7 @@ bool advance_chead(struct bbq *q, uint64_t ch, uint64_t ver) {
struct bbq_block *n_blk = &(q->blocks[(ch_idx + 1) & q->idx_mask]);
uint64_t ch_vsn = bbq_head_vsn(q, ch);
- uint64_t committed = atomic_load(&n_blk->committed);
+ uint64_t committed = bbq_atomic64_load(&n_blk->committed);
uint64_t committed_vsn = bbq_cur_vsn(q, committed);
if (BBQ_F_CHK_DROP_OLD(q->flags)) {
// 通过检查下一个块的版本是否大于或等于当前块来保证 FIFO 顺序.
@@ -761,7 +811,7 @@ static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n
}
while (true) {
- uint64_t ch = atomic_load(&q->chead);
+ uint64_t ch = bbq_atomic64_load(&q->chead);
struct bbq_block *blk = &(q->blocks[bbq_head_idx(q, ch)]);
struct bbq_queue_state_s state;
state = bbq_reserve_entry(q, blk, n);
@@ -933,8 +983,8 @@ static uint32_t bbq_enqueue_burst_two_dimensional(struct bbq *q, void *const *ob
}
bool bbq_empty(struct bbq *q) {
- uint64_t phead = atomic_load(&q->phead);
- uint64_t chead = atomic_load(&q->chead);
+ uint64_t phead = bbq_atomic64_load(&q->phead);
+ uint64_t chead = bbq_atomic64_load(&q->chead);
uint64_t ph_vsn = bbq_head_vsn(q, phead);
uint64_t ch_vsn = bbq_head_vsn(q, chead);
@@ -949,12 +999,12 @@ bool bbq_empty(struct bbq *q) {
block = &q->blocks[ph_idx];
if (ph_vsn == ch_vsn) {
- if (bbq_cur_off(q, atomic_load(&block->reserved)) == bbq_cur_off(q, atomic_load(&block->committed))) {
+ if (bbq_cur_off(q, bbq_atomic64_load(&block->reserved)) == bbq_cur_off(q, bbq_atomic64_load(&block->committed))) {
return true;
}
}
- bbq_cursor reserved = atomic_load(&block->reserved);
+ uint64_t reserved = bbq_atomic64_load(&block->reserved);
uint64_t reserved_off = bbq_cur_off(q, reserved);
if (BBQ_F_CHK_DROP_OLD(q->flags) &&
@@ -1058,10 +1108,10 @@ void bbq_debug_memory_print() {
}
void bbq_debug_block_print(struct bbq *q, struct bbq_block *block) {
- bbq_cursor allocated = atomic_load(&block->allocated);
- bbq_cursor committed = atomic_load(&block->committed);
- bbq_cursor reserved = atomic_load(&block->reserved);
- bbq_cursor consumed = atomic_load(&block->consumed);
+ uint64_t allocated = bbq_atomic64_load(&block->allocated);
+ uint64_t committed = bbq_atomic64_load(&block->committed);
+ uint64_t reserved = bbq_atomic64_load(&block->reserved);
+ uint64_t consumed = bbq_atomic64_load(&block->consumed);
printf(" allocated:%lu committed:%lu reserved:%lu",
bbq_cur_off(q, allocated), bbq_cur_off(q, committed), bbq_cur_off(q, reserved));
if (BBQ_F_CHK_DROP_OLD(q->flags)) {
@@ -1073,8 +1123,8 @@ void bbq_debug_block_print(struct bbq *q, struct bbq_block *block) {
void bbq_debug_struct_print(struct bbq *q) {
printf("-----bbq:%s-----\n", BBQ_F_CHK_DROP_OLD(q->flags) ? "drop old" : "retry new");
- uint64_t phead = atomic_load(&q->phead);
- uint64_t chead = atomic_load(&q->chead);
+ uint64_t phead = bbq_atomic64_load(&q->phead);
+ uint64_t chead = bbq_atomic64_load(&q->chead);
printf("block number:%u block size:%u total entries:%u\n", q->bn, q->bs, q->bn * q->bs);
printf("producer header idx:%lu vsn:%lu\n", bbq_head_idx(q, phead), bbq_head_vsn(q, phead));
diff --git a/bbq/tests/common/test_queue.c b/bbq/tests/common/test_queue.c
index 0934285..a27c752 100644
--- a/bbq/tests/common/test_queue.c
+++ b/bbq/tests/common/test_queue.c
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-27 02:50:17
+ * @LastEditTime: 2024-06-27 07:19:03
* @Describe: TODO
*/
@@ -20,6 +20,15 @@ uint32_t test_bbq_enqueue_burst(void *ring, void **obj_table, uint32_t n, uint16
}
int test_queue_init_bbq(test_cfg *cfg, test_queue_s *q) {
+ unsigned int flags = 0;
+ if (cfg->ring.producer_cnt <= 1) {
+ flags |= BBQ_F_SP_ENQ;
+ }
+
+ if (cfg->ring.consumer_cnt <= 1) {
+ flags |= BBQ_F_SC_DEQ;
+ }
+
if (cfg->ring.block_count == 0) {
q->ring = bbq_create("test_bbq", cfg->ring.entries_cnt, BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW);
} else {
diff --git a/bbq/tests/unittest/ut_example.cc b/bbq/tests/unittest/ut_example.cc
index 0439612..9e8e86c 100644
--- a/bbq/tests/unittest/ut_example.cc
+++ b/bbq/tests/unittest/ut_example.cc
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-25 11:40:27
+ * @LastEditTime: 2024-06-27 07:05:41
* @Describe: 简单的测试用例,测试基本功能
*/
diff --git a/bbq/tests/unittest/ut_head_cursor.cc b/bbq/tests/unittest/ut_head_cursor.cc
index e3b3b50..b1d388b 100644
--- a/bbq/tests/unittest/ut_head_cursor.cc
+++ b/bbq/tests/unittest/ut_head_cursor.cc
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-25 11:42:49
+ * @LastEditTime: 2024-06-27 07:08:44
* @Describe: TODO
*/
@@ -11,6 +11,7 @@ extern "C" {
extern bool bbq_malloc_free_equal();
extern bool bbq_debug_check_array_bounds(struct bbq *q);
extern struct bbq *bbq_create_elem_with_bnbs(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags);
+extern uint64_t bbq_atomic64_load(struct bbq_atomic64 *atomic);
}
class bbq_head_cursor : public testing::Test { // 继承了 testing::Test
@@ -30,33 +31,39 @@ class bbq_head_cursor : public testing::Test { // 继承了 testing::Test
};
void expect_phead(struct bbq *q, uint64_t idx, uint64_t vsn, int line) {
- EXPECT_EQ(bbq_head_idx(q, q->phead), idx) << "line: " << line;
- EXPECT_EQ(bbq_head_vsn(q, q->phead), vsn) << "line: " << line;
+ uint64_t ph = bbq_atomic64_load(&q->phead);
+ EXPECT_EQ(bbq_head_idx(q, ph), idx) << "line: " << line;
+ EXPECT_EQ(bbq_head_vsn(q, ph), vsn) << "line: " << line;
}
void expect_chead(struct bbq *q, uint64_t idx, uint64_t vsn, int line) {
- EXPECT_EQ(bbq_head_idx(q, q->chead), idx) << "line: " << line;
- EXPECT_EQ(bbq_head_vsn(q, q->chead), vsn) << "line: " << line;
+ uint64_t ch = bbq_atomic64_load(&q->chead);
+ EXPECT_EQ(bbq_head_idx(q, ch), idx) << "line: " << line;
+ EXPECT_EQ(bbq_head_vsn(q, ch), vsn) << "line: " << line;
}
void expect_eq_allocated(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) {
- EXPECT_EQ(bbq_cur_off(q, block->allocated), off) << "line: " << line;
- EXPECT_EQ(bbq_cur_vsn(q, block->allocated), vsn) << "line: " << line;
+ uint64_t allocated = bbq_atomic64_load(&block->allocated);
+ EXPECT_EQ(bbq_cur_off(q, allocated), off) << "line: " << line;
+ EXPECT_EQ(bbq_cur_vsn(q, allocated), vsn) << "line: " << line;
}
void expect_eq_committed(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) {
- EXPECT_EQ(bbq_cur_off(q, block->committed), off) << "line: " << line;
- EXPECT_EQ(bbq_cur_vsn(q, block->committed), vsn) << "line: " << line;
+ uint64_t committed = bbq_atomic64_load(&block->committed);
+ EXPECT_EQ(bbq_cur_off(q, committed), off) << "line: " << line;
+ EXPECT_EQ(bbq_cur_vsn(q, committed), vsn) << "line: " << line;
}
void expect_eq_consumed(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) {
- EXPECT_EQ(bbq_cur_off(q, block->consumed), off) << "line: " << line;
- EXPECT_EQ(bbq_cur_vsn(q, block->consumed), vsn) << "line: " << line;
+ uint64_t consumed = bbq_atomic64_load(&block->consumed);
+ EXPECT_EQ(bbq_cur_off(q, consumed), off) << "line: " << line;
+ EXPECT_EQ(bbq_cur_vsn(q, consumed), vsn) << "line: " << line;
}
void expect_eq_reserved(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) {
- EXPECT_EQ(bbq_cur_off(q, block->reserved), off) << "line: " << line;
- EXPECT_EQ(bbq_cur_vsn(q, block->reserved), vsn) << "line: " << line;
+ uint64_t reserved = bbq_atomic64_load(&block->reserved);
+ EXPECT_EQ(bbq_cur_off(q, reserved), off) << "line: " << line;
+ EXPECT_EQ(bbq_cur_vsn(q, reserved), vsn) << "line: " << line;
}
// 初始化状态
@@ -68,8 +75,9 @@ TEST_F(bbq_head_cursor, init) {
ASSERT_NE(q, nullptr);
// 1.初始化状态,除了第一个block外其他块的4个游标都指向最后一个条目
- EXPECT_EQ(q->phead, 0);
- EXPECT_EQ(q->chead, 0);
+
+ EXPECT_EQ(bbq_atomic64_load(&q->phead), 0);
+ EXPECT_EQ(bbq_atomic64_load(&q->chead), 0);
expect_eq_allocated(q, &q->blocks[0], 0, 0, __LINE__);
expect_eq_committed(q, &q->blocks[0], 0, 0, __LINE__);
@@ -105,8 +113,8 @@ void ut_produce_something(uint32_t produce_cnt) {
EXPECT_TRUE(ret == BBQ_OK);
}
- EXPECT_EQ(q->phead, 0);
- EXPECT_EQ(q->chead, 0);
+ EXPECT_EQ(bbq_atomic64_load(&q->phead), 0);
+ EXPECT_EQ(bbq_atomic64_load(&q->chead), 0);
expect_eq_allocated(q, &q->blocks[0], produce_cnt, 0, __LINE__);
expect_eq_committed(q, &q->blocks[0], produce_cnt, 0, __LINE__);
expect_eq_reserved(q, &q->blocks[0], 0, 0, __LINE__);
@@ -119,8 +127,8 @@ void ut_produce_something(uint32_t produce_cnt) {
EXPECT_EQ(dequeue_data, TEST_DATA_MAGIC);
}
- EXPECT_EQ(q->phead, 0);
- EXPECT_EQ(q->chead, 0);
+ EXPECT_EQ(bbq_atomic64_load(&q->phead), 0);
+ EXPECT_EQ(bbq_atomic64_load(&q->chead), 0);
expect_eq_allocated(q, &q->blocks[0], produce_cnt, 0, __LINE__);
expect_eq_committed(q, &q->blocks[0], produce_cnt, 0, __LINE__);
expect_eq_reserved(q, &q->blocks[0], produce_cnt, 0, __LINE__);
@@ -164,7 +172,7 @@ void ut_produce_next_block(uint32_t over) {
EXPECT_TRUE(ret == BBQ_OK);
}
- EXPECT_EQ(q->chead, 0);
+ EXPECT_EQ(bbq_atomic64_load(&q->chead), 0);
expect_phead(q, 1, 0, __LINE__);
expect_eq_allocated(q, &q->blocks[0], bs, 0, __LINE__);
expect_eq_committed(q, &q->blocks[0], bs, 0, __LINE__);
@@ -266,7 +274,8 @@ TEST_F(bbq_head_cursor, retry_new_full_empty) {
uint32_t entries_cnt = 4096;
uint32_t loop = 1000;
struct bbq *q;
-
+ uint64_t ph = 0;
+ uint64_t ch = 0;
int *data = (int *)test_malloc(TEST_MODULE_UTEST, sizeof(*data) * entries_cnt);
int tmp_data = 0;
EXPECT_TRUE(data);
@@ -290,14 +299,17 @@ TEST_F(bbq_head_cursor, retry_new_full_empty) {
EXPECT_TRUE(ret == BBQ_ERR_FULL);
}
+ ph = bbq_atomic64_load(&q->phead);
+ ch = bbq_atomic64_load(&q->chead);
if (i == 0) {
- EXPECT_EQ((q->phead.load() + 1) & q->idx_mask, q->chead.load() & q->idx_mask);
+ EXPECT_EQ((ph + 1) & q->idx_mask, ch & q->idx_mask);
} else {
- EXPECT_EQ((q->phead.load()) & q->idx_mask, q->chead.load() & q->idx_mask);
+ EXPECT_EQ((ph)&q->idx_mask, ch & q->idx_mask);
}
+
for (uint32_t i = 0; i < q->bn; i++) {
- EXPECT_EQ(q->blocks[i].committed.load() & q->off_mask, q->bs);
- EXPECT_GE(q->blocks[i].allocated.load() & q->off_mask, q->bs);
+ EXPECT_EQ(bbq_atomic64_load(&q->blocks[i].committed) & q->off_mask, q->bs);
+ EXPECT_GE(bbq_atomic64_load(&q->blocks[i].allocated) & q->off_mask, q->bs);
}
// 全出队
@@ -315,12 +327,14 @@ TEST_F(bbq_head_cursor, retry_new_full_empty) {
EXPECT_TRUE(ret == BBQ_ERR_EMPTY);
}
- EXPECT_EQ(q->phead.load() & q->idx_mask, q->chead.load() & q->idx_mask);
+ ph = bbq_atomic64_load(&q->phead);
+ ch = bbq_atomic64_load(&q->chead);
+ EXPECT_EQ(ph & q->idx_mask, ch & q->idx_mask);
for (uint32_t i = 0; i < q->bn; i++) {
- EXPECT_EQ(q->blocks[i].committed.load() & q->off_mask, q->bs);
- EXPECT_GE(q->blocks[i].allocated.load() & q->off_mask, q->bs);
- EXPECT_EQ(q->blocks[i].consumed.load() & q->off_mask, q->bs);
- EXPECT_GE(q->blocks[i].reserved.load() & q->off_mask, q->bs);
+ EXPECT_EQ(bbq_atomic64_load(&q->blocks[i].committed) & q->off_mask, q->bs);
+ EXPECT_GE(bbq_atomic64_load(&q->blocks[i].allocated) & q->off_mask, q->bs);
+ EXPECT_EQ(bbq_atomic64_load(&q->blocks[i].consumed) & q->off_mask, q->bs);
+ EXPECT_GE(bbq_atomic64_load(&q->blocks[i].reserved) & q->off_mask, q->bs);
}
}
@@ -434,7 +448,7 @@ TEST_F(bbq_head_cursor, drop_old_full_empty) {
expect_eq_committed(q, &q->blocks[i], q->bs, i == 0 ? j : j + 1, __LINE__);
expect_eq_allocated(q, &q->blocks[i], q->bs, i == 0 ? j : j + 1, __LINE__);
expect_eq_reserved(q, &q->blocks[i], q->bs, i == 0 ? j : j + 1, __LINE__);
- EXPECT_EQ(q->blocks[i].consumed.load(), 0);
+ EXPECT_EQ(bbq_atomic64_load(&q->blocks[i].consumed), 0);
}
}
EXPECT_TRUE(bbq_debug_check_array_bounds(q));
@@ -486,7 +500,7 @@ TEST_F(bbq_head_cursor, drop_old_full_empty_cover) {
expect_eq_reserved(q, &q->blocks[i],
i == 0 ? 0 : bs, 0,
__LINE__);
- EXPECT_EQ(q->blocks[i].consumed.load(), 0);
+ EXPECT_EQ(bbq_atomic64_load(&q->blocks[i].consumed), 0);
}
// 队列中的数据全出队
@@ -514,7 +528,7 @@ TEST_F(bbq_head_cursor, drop_old_full_empty_cover) {
expect_eq_reserved(q, &q->blocks[i],
i == bn - 1 ? over_cnt - bs : bs,
i == 1 ? loop + 1 : 0, __LINE__);
- EXPECT_EQ(q->blocks[i].consumed.load(), 0);
+ EXPECT_EQ(bbq_atomic64_load(&q->blocks[i].consumed), 0);
}
EXPECT_TRUE(bbq_debug_check_array_bounds(q));
diff --git a/bbq/tests/unittest/ut_mix.cc b/bbq/tests/unittest/ut_mix.cc
index fa2f07f..cd545bf 100644
--- a/bbq/tests/unittest/ut_mix.cc
+++ b/bbq/tests/unittest/ut_mix.cc
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-26 03:37:56
+ * @LastEditTime: 2024-06-27 07:25:43
* @Describe: bbq除了队列操作外,其他函数的测试
*/
@@ -11,10 +11,12 @@ extern "C" {
#include <math.h>
extern bool bbq_check_power_of_two(int n);
extern unsigned bbq_ceil_log2(uint64_t x);
-extern uint64_t bbq_fetch_max(aotmic_uint64 *atom, uint64_t upd);
+extern uint64_t bbq_fetch_max(struct bbq_atomic64 *atomic, uint64_t upd);
extern bool bbq_malloc_free_equal();
extern bool test_malloc_free_equal();
extern int bbq_bnbs_calc(uint32_t entries, uint32_t *bn, uint32_t *bs);
+extern void bbq_atomic64_store(struct bbq_atomic64 *atomic, uint64_t value);
+extern uint64_t bbq_atomic64_load(struct bbq_atomic64 *atomic);
}
class bbq_mix : public testing::Test { // 继承了 testing::Test
@@ -35,7 +37,7 @@ class bbq_mix : public testing::Test { // 继承了 testing::Test
typedef struct {
uint64_t thread_cnt;
- aotmic_uint64 data;
+ bbq_atomic64 data;
aotmic_uint64 ready_thread_cnt;
} ut_fetch_arg;
@@ -54,12 +56,18 @@ void *fetch_max_thread_func(void *arg) {
TEST_F(bbq_mix, bbq_fetch_max) {
uint64_t ret = 0;
- ut_fetch_arg arg = {};
- arg.data.store(1); // 初始化1
+ ut_fetch_arg arg;
+
+ arg.data.single = false;
+ arg.data.m.store(0);
+ arg.thread_cnt = 0;
+ arg.ready_thread_cnt.store(0);
+
+ bbq_atomic64_store(&arg.data, 1); // 初始化1
arg.thread_cnt = 50;
ret = bbq_fetch_max(&arg.data, 2); // max比较后设置为2
- EXPECT_EQ(arg.data.load(), 2);
+ EXPECT_EQ(bbq_atomic64_load(&arg.data), 2);
EXPECT_EQ(ret, 1);
pthread_t *threads = (pthread_t *)test_malloc(TEST_MODULE_UTEST, sizeof(*threads) * arg.thread_cnt);
diff --git a/bbq/tests/unittest/ut_multit.cc b/bbq/tests/unittest/ut_multit.cc
index bbe963b..ff5a308 100644
--- a/bbq/tests/unittest/ut_multit.cc
+++ b/bbq/tests/unittest/ut_multit.cc
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-25 11:30:59
+ * @LastEditTime: 2024-06-27 07:06:22
* @Describe: TODO
*/
diff --git a/perf/benchmark/bcm_benchmark.c b/perf/benchmark/bcm_benchmark.c
index 801cd3a..5407d2b 100644
--- a/perf/benchmark/bcm_benchmark.c
+++ b/perf/benchmark/bcm_benchmark.c
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-18 18:20:02
+ * @LastEditTime: 2024-06-27 07:11:39
* @Describe: TODO
*/
@@ -101,8 +101,8 @@ int main(int argc, char *argv[]) {
burst_cnt = 1;
}
} else {
- config = "/root/code/c/bbq-ly/perf/benchmark/config/compare/case1_simple_spsc.ini";
- ring_type = "bbq";
+ config = "/root/code/c/bbq/perf/benchmark/config/compare/case1_simple_spsc.ini";
+ ring_type = "dpdk";
burst_cnt = 16;
TEST_ERR_LOG("use default config, ringt_type:%s burst:%u config:%s argc:%d", ring_type, burst_cnt, config, argc);
}
diff --git a/perf/benchmark/bcm_queue.c b/perf/benchmark/bcm_queue.c
index 5cbcea7..70f612b 100644
--- a/perf/benchmark/bcm_queue.c
+++ b/perf/benchmark/bcm_queue.c
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-25 14:14:40
+ * @LastEditTime: 2024-06-27 07:12:44
* @Describe: TODO
*/
@@ -10,11 +10,16 @@
static __rte_always_inline unsigned int
bcm_dpdk_ring_enqueue_burst(struct rte_ring *r, void **obj_table, uint32_t n, uint16_t thread_idx, uint32_t *wait_consumed) {
TEST_AVOID_WARNING(thread_idx);
- unsigned int free_space = 0;
int ret = 0;
- ret = rte_ring_enqueue_burst(r, (void *const *)obj_table, n, &free_space);
- *wait_consumed = r->size - free_space - 1;
+ if (wait_consumed) {
+ unsigned int free_space = 0;
+ ret = rte_ring_enqueue_burst(r, (void *const *)obj_table, n, &free_space);
+ *wait_consumed = r->size - free_space - 1;
+ } else {
+ ret = rte_ring_enqueue_burst(r, (void *const *)obj_table, n, NULL);
+ }
+
return ret;
}