diff options
| author | liuyu <[email protected]> | 2024-06-27 07:34:04 -0400 |
|---|---|---|
| committer | liuyu <[email protected]> | 2024-06-27 07:34:04 -0400 |
| commit | cc654f704fe3caf3c6bc9b385ec26f23c098f5de (patch) | |
| tree | 461b3cf8440a734f88b21687dff6d4d28901d9a1 | |
| parent | 7f309a3257c04abbf20e5467d081da96413e4d21 (diff) | |
添加scsp模式
| -rw-r--r-- | .vscode/settings.json | 4 | ||||
| -rw-r--r-- | bbq/include/bbq.h | 53 | ||||
| -rw-r--r-- | bbq/src/bbq.c | 182 | ||||
| -rw-r--r-- | bbq/tests/common/test_queue.c | 11 | ||||
| -rw-r--r-- | bbq/tests/unittest/ut_example.cc | 2 | ||||
| -rw-r--r-- | bbq/tests/unittest/ut_head_cursor.cc | 80 | ||||
| -rw-r--r-- | bbq/tests/unittest/ut_mix.cc | 20 | ||||
| -rw-r--r-- | bbq/tests/unittest/ut_multit.cc | 2 | ||||
| -rw-r--r-- | perf/benchmark/bcm_benchmark.c | 6 | ||||
| -rw-r--r-- | perf/benchmark/bcm_queue.c | 13 |
10 files changed, 235 insertions, 138 deletions
diff --git a/.vscode/settings.json b/.vscode/settings.json index 2aff784..a379dda 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -92,6 +92,8 @@ "test_mix.h": "c", "test_queue.h": "c", "prctl.h": "c", - "types.h": "c" + "types.h": "c", + "chrono": "c", + "fstream": "c" } }
\ No newline at end of file diff --git a/bbq/include/bbq.h b/bbq/include/bbq.h index 3b8269d..7f421f8 100644 --- a/bbq/include/bbq.h +++ b/bbq/include/bbq.h @@ -1,6 +1,6 @@ /* * @Author: [email protected] - * @LastEditTime: 2024-06-27 03:04:19 + * @LastEditTime: 2024-06-27 06:20:30 * @Describe: bbq(Block-based Bounded Queue)头文件 * 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf */ @@ -15,41 +15,45 @@ // C #include <stdatomic.h> typedef atomic_uint_fast64_t aotmic_uint64; -typedef aotmic_uint64 bbq_cursor; -typedef aotmic_uint64 bbq_head; #else // C++ 为了兼容gtest测试 -using bbq_cursor = std::atomic<uint64_t>; -using bbq_head = std::atomic<uint64_t>; using aotmic_uint64 = std::atomic<uint64_t>; #endif #define BBQ_SOCKET_ID_ANY -1 -#define __BBQ_CACHE_ALIGNED __attribute__((__aligned__(64))) #define BBQ_SYMBOL_MAX 64 +#define __BBQ_CACHE_ALIGNED __attribute__((__aligned__(64))) + +struct bbq_atomic64 { + bool single; // 如果为单生产者或单消费者,则single为true + union { + volatile uint64_t s; // single使用该字段 + aotmic_uint64 m; + }; +} __BBQ_CACHE_ALIGNED; struct bbq_block { - bbq_cursor committed; // 已提交(version|offset) - bbq_cursor allocated; // 已分配(version|offset) - bbq_cursor reserved; // 已预留(version|offset) - bbq_cursor consumed; // 已消费(version|offset)注:在drop-old模式下没用到 - char *entries; // 存储大小可变的entry,每个块分配空间:bs * entry_size + struct bbq_atomic64 committed; // 已提交(version|offset) + struct bbq_atomic64 allocated; // 已分配(version|offset) + struct bbq_atomic64 reserved; // 已预留(version|offset) + struct bbq_atomic64 consumed; // 已消费(version|offset)注:在drop-old模式下没用到 + char *entries; // 存储大小可变的entry,每个块分配空间:bs * entry_size } __BBQ_CACHE_ALIGNED; struct bbq { char name[BBQ_SYMBOL_MAX] __BBQ_CACHE_ALIGNED; - int32_t socket_id; // 用于libnuma分配内存,socket_id小于0将使用malloc分配 - uint32_t bn; // blocks的个数 - uint32_t bs; // blocks.entries的个数 - uint32_t flags; // 标记:retry new 模式,还是drop old模式 - uint32_t idx_bits; // bbq_head里idx所占的位数 - uint32_t off_bits; // bbq_cursor里offset所占的位数 - uint64_t idx_mask; // idx_bits偏移后的掩码 - uint64_t off_mask; // off_bits偏移后的掩码 - uint64_t entry_size; // blocks.entries里每个entry的大小 - bbq_head phead; // 生产者头,指向块的索引,分为两部分:version|idx - bbq_head chead; // 消费者头,指向块的索引,分为两部分:version|idx + int32_t socket_id; // 用于libnuma分配内存,socket_id小于0将使用malloc分配 + uint32_t bn; // blocks的个数 + uint32_t bs; // blocks.entries的个数 + uint32_t flags; // 标记:retry new 模式,还是drop old模式 + uint32_t idx_bits; // bbq_head里idx所占的位数 + uint32_t off_bits; // bbq_cursor里offset所占的位数 + uint64_t idx_mask; // idx_bits偏移后的掩码 + uint64_t off_mask; // off_bits偏移后的掩码 + uint64_t entry_size; // blocks.entries里每个entry的大小 + struct bbq_atomic64 phead; // 生产者头,指向块的索引,分为两部分:version|idx + struct bbq_atomic64 chead; // 消费者头,指向块的索引,分为两部分:version|idx struct bbq_block *blocks; // bn大小的数组 } __BBQ_CACHE_ALIGNED; @@ -58,6 +62,11 @@ struct bbq { #define BBQ_F_DROP_OLD 0x0002 /**< 创建队列时设置为drop old模式(队列满时,入队成功并覆盖旧数据) */ #define BBQ_F_RETRY_NEW BBQ_F_DEFAULT /**< 创建队列时设置为retry new模式(队列满时,当前入队失败) */ + +#define BBQ_F_SP_ENQ 0x0004 +#define BBQ_F_MP_ENQ BBQ_F_DEFAULT +#define BBQ_F_SC_DEQ 0x0008 +#define BBQ_F_MC_DEQ BBQ_F_DEFAULT /** * 创建bbq队列,使用当前函数创建的队列,后续操作会把指针入队。 * 对应入队函数:bbq_enqueue、bbq_enqueue_burst diff --git a/bbq/src/bbq.c b/bbq/src/bbq.c index 5e63c51..bf7760f 100644 --- a/bbq/src/bbq.c +++ b/bbq/src/bbq.c @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-26 04:59:28 + * @LastEditTime: 2024-06-27 07:28:24 * @Email: [email protected] * @Describe: bbq(Block-based Bounded Queue)实现 * 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf @@ -18,6 +18,8 @@ // 判断flags标记位 #define BBQ_F_CHK_DROP_OLD(flags) (flags & BBQ_F_DROP_OLD) #define BBQ_F_CHK_COPY_VALUE(flags) (flags & BBQ_F_COPY_VALUE) +#define BBQ_F_CHK_SP_ENQ(flags) (flags & BBQ_F_SP_ENQ) +#define BBQ_F_CHK_SC_DEQ(flags) (flags & BBQ_F_SC_DEQ) // 避免无用参数的编译告警 #define AVOID_WARNING(param) ((void)param) @@ -78,6 +80,31 @@ static inline uint64_t bbq_set_cur_vsn(struct bbq *q, uint64_t ver) { return ver << q->off_bits; } +uint64_t bbq_atomic64_load(struct bbq_atomic64 *atomic) { + if (atomic->single) { + return atomic->s; + } else { + return atomic_load(&atomic->m); + } +} + +void bbq_atomic64_store(struct bbq_atomic64 *atomic, uint64_t value) { + if (atomic->single) { + atomic->s = value; + } else { + atomic_store(&atomic->m, value); + } +} +static inline uint64_t bbq_atomic64_fetch_add(struct bbq_atomic64 *atomic, uint64_t value) { + if (atomic->single) { + uint64_t old = atomic->s; + atomic->s += value; + return old; + } else { + return atomic_fetch_add(&atomic->m, value); + } +} + // 当BBQ_MEMORY宏定义开关打开,将对内存分配释放进行统计,方便排查内存泄漏 enum bbq_module { BBQ_MODULE_MAIN = 0, @@ -133,11 +160,17 @@ static void bbq_free(enum bbq_module module, int socket_id, void *ptr, size_t si } /* 原子的比较两个值大小,并设置较大的值,成功则返回设置前的旧值 */ -uint64_t bbq_fetch_max(aotmic_uint64 *atom, uint64_t upd) { +uint64_t bbq_fetch_max(struct bbq_atomic64 *atomic, uint64_t upd) { uint64_t old_value = 0; - do { - old_value = atomic_load(atom); - } while (old_value < upd && !atomic_compare_exchange_weak(atom, &old_value, upd)); + + if (atomic->single) { + old_value = atomic->s; + atomic->s = upd; + } else { + do { + old_value = atomic_load(&atomic->m); + } while (old_value < upd && !atomic_compare_exchange_weak(&atomic->m, &old_value, upd)); + } return old_value; } @@ -184,16 +217,20 @@ int bbq_bnbs_calc(uint32_t entries, uint32_t *bn, uint32_t *bs) { } /* 块初始化 */ -int block_init(struct bbq *q, struct bbq_block *block, bool cursor_init) { +int block_init(struct bbq *q, struct bbq_block *block, bool cursor_init, uint32_t flags) { + size_t size = 0; #ifdef BBQ_MEMORY // 末尾多分配一个entry,它永远不应该被修改,以此检查是否存在写越界的问题 - block->entries = bbq_malloc(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, - (q->bs + 1) * q->entry_size); + size = (q->bs + 1) * q->entry_size; + block->entries = bbq_malloc(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, size); char *last_entry = block->entries + q->entry_size * q->bs; + memset(block->entries, 0, size); memset(last_entry, BBQ_MEM_MAGIC, q->entry_size); #else + size = q->bs * q->entry_size; block->entries = bbq_malloc(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, q->bs * q->entry_size); + memset(block->entries, 0, size); #endif if (block->entries == NULL) { @@ -201,20 +238,23 @@ int block_init(struct bbq *q, struct bbq_block *block, bool cursor_init) { return bbq_errno; } - block->committed = ATOMIC_VAR_INIT(0); - block->allocated = ATOMIC_VAR_INIT(0); - block->reserved = ATOMIC_VAR_INIT(0); - block->consumed = ATOMIC_VAR_INIT(0); + if (BBQ_F_CHK_SP_ENQ(flags)) { + block->allocated.single = true; + block->committed.single = true; + } + + if (BBQ_F_CHK_SC_DEQ(flags)) { + block->reserved.single = true; + block->consumed.single = true; + } if (cursor_init) { // block数组里,除了第一块之外需要设置 - block->committed = ATOMIC_VAR_INIT(q->bs); - block->allocated = ATOMIC_VAR_INIT(q->bs); - block->reserved = ATOMIC_VAR_INIT(q->bs); - if (BBQ_F_CHK_DROP_OLD(q->flags)) { - block->consumed = ATOMIC_VAR_INIT(0); // drop old模式下用不到consumed - } else { - block->consumed = ATOMIC_VAR_INIT(q->bs); + bbq_atomic64_store(&block->committed, q->bs); + bbq_atomic64_store(&block->allocated, q->bs); + bbq_atomic64_store(&block->reserved, q->bs); + if (!BBQ_F_CHK_DROP_OLD(q->flags)) { + bbq_atomic64_store(&block->consumed, q->bs); } } @@ -254,7 +294,7 @@ static unsigned bbq_ceil_log2(uint64_t x) { /* 创建消息队列,bn和bs必须是2的N次幂,socket_id用于多numa分配内存 */ static struct bbq *__bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags) { int ret = 0; - + size_t size = 0; if (bbq_check_power_of_two(bn) == false) { bbq_errno = BBQ_ERR_POWER_OF_TWO; return NULL; @@ -291,19 +331,24 @@ static struct bbq *__bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs, q->bs = bs; q->entry_size = obj_size; q->socket_id = socket_id; - q->phead = ATOMIC_VAR_INIT(0); - q->chead = ATOMIC_VAR_INIT(0); + if (BBQ_F_CHK_SP_ENQ(flags)) { + q->phead.single = true; + } + if (BBQ_F_CHK_SC_DEQ(flags)) { + q->chead.single = true; + } q->flags = flags; - q->blocks = bbq_malloc(BBQ_MODULE_BLOCK_NB, socket_id, bn * sizeof(*q->blocks)); + size = bn * sizeof(*q->blocks); + q->blocks = bbq_malloc(BBQ_MODULE_BLOCK_NB, socket_id, size); if (q->blocks == NULL) { bbq_errno = BBQ_ERR_ALLOC; goto error; } - memset(q->blocks, 0, sizeof(*q->blocks)); + memset(q->blocks, 0, size); for (uint32_t i = 0; i < bn; ++i) { - ret = block_init(q, &(q->blocks[i]), (i == 0 ? false : true)); + ret = block_init(q, &(q->blocks[i]), (i == 0 ? false : true), flags); if (ret != BBQ_OK) { goto error; } @@ -414,19 +459,19 @@ void commit_entry(struct bbq *q, struct bbq_entry_desc *e, void const *data, uin break; } } - atomic_fetch_add(&e->block->committed, e->actual_burst); + bbq_atomic64_fetch_add(&e->block->committed, e->actual_burst); } struct bbq_queue_state_s allocate_entry(struct bbq *q, struct bbq_block *block, uint32_t n) { struct bbq_queue_state_s state = {0}; - if (bbq_cur_off(q, atomic_load(&block->allocated)) >= q->bs) { + if (bbq_cur_off(q, bbq_atomic64_load(&block->allocated)) >= q->bs) { state.state = BBQ_BLOCK_DONE; return state; } - uint64_t old = atomic_fetch_add(&block->allocated, n); + uint64_t old = bbq_atomic64_fetch_add(&block->allocated, n); // committed_vsn在当前块被初始化后值是不变的,通过比较vsn值,来判断allocated的off是否溢出了,导致vsn+1 - uint64_t committed_vsn = bbq_cur_vsn(q, atomic_load(&block->committed)); + uint64_t committed_vsn = bbq_cur_vsn(q, bbq_atomic64_load(&block->committed)); uint64_t cur_vsn = bbq_cur_vsn(q, old); uint64_t cur_off = bbq_cur_off(q, old); @@ -457,13 +502,13 @@ enum bbq_queue_state advance_phead(struct bbq *q, uint64_t ph) { uint64_t ph_vsn = bbq_head_vsn(q, ph); if (BBQ_F_CHK_DROP_OLD(q->flags)) { - cur = atomic_load(&n_blk->committed); + cur = bbq_atomic64_load(&n_blk->committed); // 生产者head避免覆盖上一轮尚未完全提交的区块 if (bbq_cur_vsn(q, cur) == ph_vsn && bbq_cur_off(q, cur) != q->bs) { return BBQ_NOT_AVAILABLE; } } else { - cur = atomic_load(&n_blk->consumed); + cur = bbq_atomic64_load(&n_blk->consumed); uint64_t reserved; uint64_t consumed_off = bbq_cur_off(q, cur); uint64_t consumed_vsn = bbq_cur_vsn(q, cur); @@ -471,7 +516,7 @@ enum bbq_queue_state advance_phead(struct bbq *q, uint64_t ph) { if (consumed_vsn < ph_vsn || (consumed_vsn == ph_vsn && consumed_off != q->bs)) { // 生产者赶上了消费者 - reserved = atomic_load(&n_blk->reserved); + reserved = bbq_atomic64_load(&n_blk->reserved); if (bbq_cur_off(q, reserved) == consumed_off) { return BBQ_NO_ENTRY; } else { @@ -497,21 +542,21 @@ static uint32_t bbq_wait_consumed_set(struct bbq *q, uint64_t *ch_ptr, uint64_t if (ch_ptr != NULL) { ch = *ch_ptr; } else { - ch = atomic_load(&q->chead); + ch = bbq_atomic64_load(&q->chead); } if (ph_ptr != NULL) { ph = *ph_ptr; } else { - ph = atomic_load(&q->phead); + ph = bbq_atomic64_load(&q->phead); } uint64_t ph_idx = bbq_head_idx(q, ph); uint64_t ch_idx = bbq_head_idx(q, ch); - uint64_t committed_off = bbq_cur_off(q, atomic_load(&blk_ph->committed)); + uint64_t committed_off = bbq_cur_off(q, bbq_atomic64_load(&blk_ph->committed)); struct bbq_block *blk_ch = &(q->blocks[bbq_head_idx(q, ch)]); - uint64_t reserved_off = bbq_cur_off(q, atomic_load(&blk_ch->reserved)); + uint64_t reserved_off = bbq_cur_off(q, bbq_atomic64_load(&blk_ch->reserved)); // "生产者"超过"消费者"块的个数 uint64_t idx_diff = ph_idx >= ch_idx ? ph_idx - ch_idx : q->bn - ch_idx + ph_idx; @@ -548,7 +593,7 @@ static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t } while (true) { - uint64_t ph = atomic_load(&q->phead); + uint64_t ph = bbq_atomic64_load(&q->phead); struct bbq_block *blk = &(q->blocks[bbq_head_idx(q, ph)]); struct bbq_queue_state_s state = allocate_entry(q, blk, n); @@ -604,30 +649,35 @@ int bbq_enqueue_elem(struct bbq *q, void const *data) { } /* 更新成功 reserve成功的个数 */ -uint32_t bbq_reserve_update(bbq_cursor *aotmic, uint64_t reserved, uint32_t n) { - // TODO:逻辑可以合并 - if (n == 1) { - // fetch_max返回的是旧值,如果旧值等于局部变量reserved,则代表数据由当前线程更新 - if (bbq_fetch_max(aotmic, reserved + 1) == reserved) { - return 1; - } - - return 0; +uint32_t bbq_reserve_update(struct bbq_atomic64 *atomic, uint64_t reserved, uint32_t n) { + if (atomic->single) { + atomic->s += n; + return n; } else { - bool ret = atomic_compare_exchange_weak(aotmic, &reserved, reserved + n); - return ret == true ? n : 0; + // TODO:合并逻辑? + if (n == 1) { + // fetch_max返回的是旧值,如果旧值等于局部变量reserved,则代表数据由当前线程更新 + if (bbq_fetch_max(atomic, reserved + 1) == reserved) { + return 1; + } + + return 0; + } else { + bool ret = atomic_compare_exchange_weak(&atomic->m, &reserved, reserved + n); + return ret == true ? n : 0; + } } } struct bbq_queue_state_s bbq_reserve_entry(struct bbq *q, struct bbq_block *block, uint32_t n) { while (true) { struct bbq_queue_state_s state; - uint64_t reserved = atomic_load(&block->reserved); + uint64_t reserved = bbq_atomic64_load(&block->reserved); uint64_t reserved_off = bbq_cur_off(q, reserved); uint64_t reserved_svn = bbq_cur_vsn(q, reserved); if (reserved_off < q->bs) { - uint64_t consumed = atomic_load(&block->consumed); + uint64_t consumed = bbq_atomic64_load(&block->consumed); // TODO:bug?? ver溢出了,在drop old模式下使用了vsn,应该传入consumed的vsn合理? // TODO:这个情况可能出现? if (!BBQ_F_CHK_DROP_OLD(q->flags) && reserved_svn != bbq_cur_vsn(q, consumed)) { @@ -637,7 +687,7 @@ struct bbq_queue_state_s bbq_reserve_entry(struct bbq *q, struct bbq_block *bloc return state; } - uint64_t committed = atomic_load(&block->committed); + uint64_t committed = bbq_atomic64_load(&block->committed); uint64_t committed_off = bbq_cur_off(q, committed); if (committed_off == reserved_off) { state.state = BBQ_NO_ENTRY; @@ -646,7 +696,7 @@ struct bbq_queue_state_s bbq_reserve_entry(struct bbq *q, struct bbq_block *bloc // 当前块的数据没有被全部commited,需要通过判断allocated和committed来判断是否存在正在入队进行中的数据 if (committed_off != q->bs) { - uint64_t allocated = atomic_load(&block->allocated); + uint64_t allocated = bbq_atomic64_load(&block->allocated); if (bbq_cur_off(q, allocated) != committed_off) { state.state = BBQ_NOT_AVAILABLE; return state; @@ -712,13 +762,13 @@ bool consume_entry(struct bbq *q, struct bbq_entry_desc *e, void *deq_data, uint uint64_t allocated; if (BBQ_F_CHK_DROP_OLD(q->flags)) { // TODO:优化,考虑allocated vsn溢出?考虑判断如果生产满了,直接移动head? - allocated = atomic_load(&e->block->allocated); + allocated = bbq_atomic64_load(&e->block->allocated); // 预留的entry所在的块,已经被新生产的数据赶上了 if (bbq_cur_vsn(q, allocated) != e->vsn) { return false; } } else { - atomic_fetch_add(&e->block->consumed, e->actual_burst); + bbq_atomic64_fetch_add(&e->block->consumed, e->actual_burst); } return true; @@ -729,7 +779,7 @@ bool advance_chead(struct bbq *q, uint64_t ch, uint64_t ver) { struct bbq_block *n_blk = &(q->blocks[(ch_idx + 1) & q->idx_mask]); uint64_t ch_vsn = bbq_head_vsn(q, ch); - uint64_t committed = atomic_load(&n_blk->committed); + uint64_t committed = bbq_atomic64_load(&n_blk->committed); uint64_t committed_vsn = bbq_cur_vsn(q, committed); if (BBQ_F_CHK_DROP_OLD(q->flags)) { // 通过检查下一个块的版本是否大于或等于当前块来保证 FIFO 顺序. @@ -761,7 +811,7 @@ static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n } while (true) { - uint64_t ch = atomic_load(&q->chead); + uint64_t ch = bbq_atomic64_load(&q->chead); struct bbq_block *blk = &(q->blocks[bbq_head_idx(q, ch)]); struct bbq_queue_state_s state; state = bbq_reserve_entry(q, blk, n); @@ -933,8 +983,8 @@ static uint32_t bbq_enqueue_burst_two_dimensional(struct bbq *q, void *const *ob } bool bbq_empty(struct bbq *q) { - uint64_t phead = atomic_load(&q->phead); - uint64_t chead = atomic_load(&q->chead); + uint64_t phead = bbq_atomic64_load(&q->phead); + uint64_t chead = bbq_atomic64_load(&q->chead); uint64_t ph_vsn = bbq_head_vsn(q, phead); uint64_t ch_vsn = bbq_head_vsn(q, chead); @@ -949,12 +999,12 @@ bool bbq_empty(struct bbq *q) { block = &q->blocks[ph_idx]; if (ph_vsn == ch_vsn) { - if (bbq_cur_off(q, atomic_load(&block->reserved)) == bbq_cur_off(q, atomic_load(&block->committed))) { + if (bbq_cur_off(q, bbq_atomic64_load(&block->reserved)) == bbq_cur_off(q, bbq_atomic64_load(&block->committed))) { return true; } } - bbq_cursor reserved = atomic_load(&block->reserved); + uint64_t reserved = bbq_atomic64_load(&block->reserved); uint64_t reserved_off = bbq_cur_off(q, reserved); if (BBQ_F_CHK_DROP_OLD(q->flags) && @@ -1058,10 +1108,10 @@ void bbq_debug_memory_print() { } void bbq_debug_block_print(struct bbq *q, struct bbq_block *block) { - bbq_cursor allocated = atomic_load(&block->allocated); - bbq_cursor committed = atomic_load(&block->committed); - bbq_cursor reserved = atomic_load(&block->reserved); - bbq_cursor consumed = atomic_load(&block->consumed); + uint64_t allocated = bbq_atomic64_load(&block->allocated); + uint64_t committed = bbq_atomic64_load(&block->committed); + uint64_t reserved = bbq_atomic64_load(&block->reserved); + uint64_t consumed = bbq_atomic64_load(&block->consumed); printf(" allocated:%lu committed:%lu reserved:%lu", bbq_cur_off(q, allocated), bbq_cur_off(q, committed), bbq_cur_off(q, reserved)); if (BBQ_F_CHK_DROP_OLD(q->flags)) { @@ -1073,8 +1123,8 @@ void bbq_debug_block_print(struct bbq *q, struct bbq_block *block) { void bbq_debug_struct_print(struct bbq *q) { printf("-----bbq:%s-----\n", BBQ_F_CHK_DROP_OLD(q->flags) ? "drop old" : "retry new"); - uint64_t phead = atomic_load(&q->phead); - uint64_t chead = atomic_load(&q->chead); + uint64_t phead = bbq_atomic64_load(&q->phead); + uint64_t chead = bbq_atomic64_load(&q->chead); printf("block number:%u block size:%u total entries:%u\n", q->bn, q->bs, q->bn * q->bs); printf("producer header idx:%lu vsn:%lu\n", bbq_head_idx(q, phead), bbq_head_vsn(q, phead)); diff --git a/bbq/tests/common/test_queue.c b/bbq/tests/common/test_queue.c index 0934285..a27c752 100644 --- a/bbq/tests/common/test_queue.c +++ b/bbq/tests/common/test_queue.c @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-27 02:50:17 + * @LastEditTime: 2024-06-27 07:19:03 * @Email: [email protected] * @Describe: TODO */ @@ -20,6 +20,15 @@ uint32_t test_bbq_enqueue_burst(void *ring, void **obj_table, uint32_t n, uint16 } int test_queue_init_bbq(test_cfg *cfg, test_queue_s *q) { + unsigned int flags = 0; + if (cfg->ring.producer_cnt <= 1) { + flags |= BBQ_F_SP_ENQ; + } + + if (cfg->ring.consumer_cnt <= 1) { + flags |= BBQ_F_SC_DEQ; + } + if (cfg->ring.block_count == 0) { q->ring = bbq_create("test_bbq", cfg->ring.entries_cnt, BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); } else { diff --git a/bbq/tests/unittest/ut_example.cc b/bbq/tests/unittest/ut_example.cc index 0439612..9e8e86c 100644 --- a/bbq/tests/unittest/ut_example.cc +++ b/bbq/tests/unittest/ut_example.cc @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-25 11:40:27 + * @LastEditTime: 2024-06-27 07:05:41 * @Email: [email protected] * @Describe: 简单的测试用例,测试基本功能 */ diff --git a/bbq/tests/unittest/ut_head_cursor.cc b/bbq/tests/unittest/ut_head_cursor.cc index e3b3b50..b1d388b 100644 --- a/bbq/tests/unittest/ut_head_cursor.cc +++ b/bbq/tests/unittest/ut_head_cursor.cc @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-25 11:42:49 + * @LastEditTime: 2024-06-27 07:08:44 * @Email: [email protected] * @Describe: TODO */ @@ -11,6 +11,7 @@ extern "C" { extern bool bbq_malloc_free_equal(); extern bool bbq_debug_check_array_bounds(struct bbq *q); extern struct bbq *bbq_create_elem_with_bnbs(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags); +extern uint64_t bbq_atomic64_load(struct bbq_atomic64 *atomic); } class bbq_head_cursor : public testing::Test { // 继承了 testing::Test @@ -30,33 +31,39 @@ class bbq_head_cursor : public testing::Test { // 继承了 testing::Test }; void expect_phead(struct bbq *q, uint64_t idx, uint64_t vsn, int line) { - EXPECT_EQ(bbq_head_idx(q, q->phead), idx) << "line: " << line; - EXPECT_EQ(bbq_head_vsn(q, q->phead), vsn) << "line: " << line; + uint64_t ph = bbq_atomic64_load(&q->phead); + EXPECT_EQ(bbq_head_idx(q, ph), idx) << "line: " << line; + EXPECT_EQ(bbq_head_vsn(q, ph), vsn) << "line: " << line; } void expect_chead(struct bbq *q, uint64_t idx, uint64_t vsn, int line) { - EXPECT_EQ(bbq_head_idx(q, q->chead), idx) << "line: " << line; - EXPECT_EQ(bbq_head_vsn(q, q->chead), vsn) << "line: " << line; + uint64_t ch = bbq_atomic64_load(&q->chead); + EXPECT_EQ(bbq_head_idx(q, ch), idx) << "line: " << line; + EXPECT_EQ(bbq_head_vsn(q, ch), vsn) << "line: " << line; } void expect_eq_allocated(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) { - EXPECT_EQ(bbq_cur_off(q, block->allocated), off) << "line: " << line; - EXPECT_EQ(bbq_cur_vsn(q, block->allocated), vsn) << "line: " << line; + uint64_t allocated = bbq_atomic64_load(&block->allocated); + EXPECT_EQ(bbq_cur_off(q, allocated), off) << "line: " << line; + EXPECT_EQ(bbq_cur_vsn(q, allocated), vsn) << "line: " << line; } void expect_eq_committed(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) { - EXPECT_EQ(bbq_cur_off(q, block->committed), off) << "line: " << line; - EXPECT_EQ(bbq_cur_vsn(q, block->committed), vsn) << "line: " << line; + uint64_t committed = bbq_atomic64_load(&block->committed); + EXPECT_EQ(bbq_cur_off(q, committed), off) << "line: " << line; + EXPECT_EQ(bbq_cur_vsn(q, committed), vsn) << "line: " << line; } void expect_eq_consumed(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) { - EXPECT_EQ(bbq_cur_off(q, block->consumed), off) << "line: " << line; - EXPECT_EQ(bbq_cur_vsn(q, block->consumed), vsn) << "line: " << line; + uint64_t consumed = bbq_atomic64_load(&block->consumed); + EXPECT_EQ(bbq_cur_off(q, consumed), off) << "line: " << line; + EXPECT_EQ(bbq_cur_vsn(q, consumed), vsn) << "line: " << line; } void expect_eq_reserved(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) { - EXPECT_EQ(bbq_cur_off(q, block->reserved), off) << "line: " << line; - EXPECT_EQ(bbq_cur_vsn(q, block->reserved), vsn) << "line: " << line; + uint64_t reserved = bbq_atomic64_load(&block->reserved); + EXPECT_EQ(bbq_cur_off(q, reserved), off) << "line: " << line; + EXPECT_EQ(bbq_cur_vsn(q, reserved), vsn) << "line: " << line; } // 初始化状态 @@ -68,8 +75,9 @@ TEST_F(bbq_head_cursor, init) { ASSERT_NE(q, nullptr); // 1.初始化状态,除了第一个block外其他块的4个游标都指向最后一个条目 - EXPECT_EQ(q->phead, 0); - EXPECT_EQ(q->chead, 0); + + EXPECT_EQ(bbq_atomic64_load(&q->phead), 0); + EXPECT_EQ(bbq_atomic64_load(&q->chead), 0); expect_eq_allocated(q, &q->blocks[0], 0, 0, __LINE__); expect_eq_committed(q, &q->blocks[0], 0, 0, __LINE__); @@ -105,8 +113,8 @@ void ut_produce_something(uint32_t produce_cnt) { EXPECT_TRUE(ret == BBQ_OK); } - EXPECT_EQ(q->phead, 0); - EXPECT_EQ(q->chead, 0); + EXPECT_EQ(bbq_atomic64_load(&q->phead), 0); + EXPECT_EQ(bbq_atomic64_load(&q->chead), 0); expect_eq_allocated(q, &q->blocks[0], produce_cnt, 0, __LINE__); expect_eq_committed(q, &q->blocks[0], produce_cnt, 0, __LINE__); expect_eq_reserved(q, &q->blocks[0], 0, 0, __LINE__); @@ -119,8 +127,8 @@ void ut_produce_something(uint32_t produce_cnt) { EXPECT_EQ(dequeue_data, TEST_DATA_MAGIC); } - EXPECT_EQ(q->phead, 0); - EXPECT_EQ(q->chead, 0); + EXPECT_EQ(bbq_atomic64_load(&q->phead), 0); + EXPECT_EQ(bbq_atomic64_load(&q->chead), 0); expect_eq_allocated(q, &q->blocks[0], produce_cnt, 0, __LINE__); expect_eq_committed(q, &q->blocks[0], produce_cnt, 0, __LINE__); expect_eq_reserved(q, &q->blocks[0], produce_cnt, 0, __LINE__); @@ -164,7 +172,7 @@ void ut_produce_next_block(uint32_t over) { EXPECT_TRUE(ret == BBQ_OK); } - EXPECT_EQ(q->chead, 0); + EXPECT_EQ(bbq_atomic64_load(&q->chead), 0); expect_phead(q, 1, 0, __LINE__); expect_eq_allocated(q, &q->blocks[0], bs, 0, __LINE__); expect_eq_committed(q, &q->blocks[0], bs, 0, __LINE__); @@ -266,7 +274,8 @@ TEST_F(bbq_head_cursor, retry_new_full_empty) { uint32_t entries_cnt = 4096; uint32_t loop = 1000; struct bbq *q; - + uint64_t ph = 0; + uint64_t ch = 0; int *data = (int *)test_malloc(TEST_MODULE_UTEST, sizeof(*data) * entries_cnt); int tmp_data = 0; EXPECT_TRUE(data); @@ -290,14 +299,17 @@ TEST_F(bbq_head_cursor, retry_new_full_empty) { EXPECT_TRUE(ret == BBQ_ERR_FULL); } + ph = bbq_atomic64_load(&q->phead); + ch = bbq_atomic64_load(&q->chead); if (i == 0) { - EXPECT_EQ((q->phead.load() + 1) & q->idx_mask, q->chead.load() & q->idx_mask); + EXPECT_EQ((ph + 1) & q->idx_mask, ch & q->idx_mask); } else { - EXPECT_EQ((q->phead.load()) & q->idx_mask, q->chead.load() & q->idx_mask); + EXPECT_EQ((ph)&q->idx_mask, ch & q->idx_mask); } + for (uint32_t i = 0; i < q->bn; i++) { - EXPECT_EQ(q->blocks[i].committed.load() & q->off_mask, q->bs); - EXPECT_GE(q->blocks[i].allocated.load() & q->off_mask, q->bs); + EXPECT_EQ(bbq_atomic64_load(&q->blocks[i].committed) & q->off_mask, q->bs); + EXPECT_GE(bbq_atomic64_load(&q->blocks[i].allocated) & q->off_mask, q->bs); } // 全出队 @@ -315,12 +327,14 @@ TEST_F(bbq_head_cursor, retry_new_full_empty) { EXPECT_TRUE(ret == BBQ_ERR_EMPTY); } - EXPECT_EQ(q->phead.load() & q->idx_mask, q->chead.load() & q->idx_mask); + ph = bbq_atomic64_load(&q->phead); + ch = bbq_atomic64_load(&q->chead); + EXPECT_EQ(ph & q->idx_mask, ch & q->idx_mask); for (uint32_t i = 0; i < q->bn; i++) { - EXPECT_EQ(q->blocks[i].committed.load() & q->off_mask, q->bs); - EXPECT_GE(q->blocks[i].allocated.load() & q->off_mask, q->bs); - EXPECT_EQ(q->blocks[i].consumed.load() & q->off_mask, q->bs); - EXPECT_GE(q->blocks[i].reserved.load() & q->off_mask, q->bs); + EXPECT_EQ(bbq_atomic64_load(&q->blocks[i].committed) & q->off_mask, q->bs); + EXPECT_GE(bbq_atomic64_load(&q->blocks[i].allocated) & q->off_mask, q->bs); + EXPECT_EQ(bbq_atomic64_load(&q->blocks[i].consumed) & q->off_mask, q->bs); + EXPECT_GE(bbq_atomic64_load(&q->blocks[i].reserved) & q->off_mask, q->bs); } } @@ -434,7 +448,7 @@ TEST_F(bbq_head_cursor, drop_old_full_empty) { expect_eq_committed(q, &q->blocks[i], q->bs, i == 0 ? j : j + 1, __LINE__); expect_eq_allocated(q, &q->blocks[i], q->bs, i == 0 ? j : j + 1, __LINE__); expect_eq_reserved(q, &q->blocks[i], q->bs, i == 0 ? j : j + 1, __LINE__); - EXPECT_EQ(q->blocks[i].consumed.load(), 0); + EXPECT_EQ(bbq_atomic64_load(&q->blocks[i].consumed), 0); } } EXPECT_TRUE(bbq_debug_check_array_bounds(q)); @@ -486,7 +500,7 @@ TEST_F(bbq_head_cursor, drop_old_full_empty_cover) { expect_eq_reserved(q, &q->blocks[i], i == 0 ? 0 : bs, 0, __LINE__); - EXPECT_EQ(q->blocks[i].consumed.load(), 0); + EXPECT_EQ(bbq_atomic64_load(&q->blocks[i].consumed), 0); } // 队列中的数据全出队 @@ -514,7 +528,7 @@ TEST_F(bbq_head_cursor, drop_old_full_empty_cover) { expect_eq_reserved(q, &q->blocks[i], i == bn - 1 ? over_cnt - bs : bs, i == 1 ? loop + 1 : 0, __LINE__); - EXPECT_EQ(q->blocks[i].consumed.load(), 0); + EXPECT_EQ(bbq_atomic64_load(&q->blocks[i].consumed), 0); } EXPECT_TRUE(bbq_debug_check_array_bounds(q)); diff --git a/bbq/tests/unittest/ut_mix.cc b/bbq/tests/unittest/ut_mix.cc index fa2f07f..cd545bf 100644 --- a/bbq/tests/unittest/ut_mix.cc +++ b/bbq/tests/unittest/ut_mix.cc @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-26 03:37:56 + * @LastEditTime: 2024-06-27 07:25:43 * @Email: [email protected] * @Describe: bbq除了队列操作外,其他函数的测试 */ @@ -11,10 +11,12 @@ extern "C" { #include <math.h> extern bool bbq_check_power_of_two(int n); extern unsigned bbq_ceil_log2(uint64_t x); -extern uint64_t bbq_fetch_max(aotmic_uint64 *atom, uint64_t upd); +extern uint64_t bbq_fetch_max(struct bbq_atomic64 *atomic, uint64_t upd); extern bool bbq_malloc_free_equal(); extern bool test_malloc_free_equal(); extern int bbq_bnbs_calc(uint32_t entries, uint32_t *bn, uint32_t *bs); +extern void bbq_atomic64_store(struct bbq_atomic64 *atomic, uint64_t value); +extern uint64_t bbq_atomic64_load(struct bbq_atomic64 *atomic); } class bbq_mix : public testing::Test { // 继承了 testing::Test @@ -35,7 +37,7 @@ class bbq_mix : public testing::Test { // 继承了 testing::Test typedef struct { uint64_t thread_cnt; - aotmic_uint64 data; + bbq_atomic64 data; aotmic_uint64 ready_thread_cnt; } ut_fetch_arg; @@ -54,12 +56,18 @@ void *fetch_max_thread_func(void *arg) { TEST_F(bbq_mix, bbq_fetch_max) { uint64_t ret = 0; - ut_fetch_arg arg = {}; - arg.data.store(1); // 初始化1 + ut_fetch_arg arg; + + arg.data.single = false; + arg.data.m.store(0); + arg.thread_cnt = 0; + arg.ready_thread_cnt.store(0); + + bbq_atomic64_store(&arg.data, 1); // 初始化1 arg.thread_cnt = 50; ret = bbq_fetch_max(&arg.data, 2); // max比较后设置为2 - EXPECT_EQ(arg.data.load(), 2); + EXPECT_EQ(bbq_atomic64_load(&arg.data), 2); EXPECT_EQ(ret, 1); pthread_t *threads = (pthread_t *)test_malloc(TEST_MODULE_UTEST, sizeof(*threads) * arg.thread_cnt); diff --git a/bbq/tests/unittest/ut_multit.cc b/bbq/tests/unittest/ut_multit.cc index bbe963b..ff5a308 100644 --- a/bbq/tests/unittest/ut_multit.cc +++ b/bbq/tests/unittest/ut_multit.cc @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-25 11:30:59 + * @LastEditTime: 2024-06-27 07:06:22 * @Email: [email protected] * @Describe: TODO */ diff --git a/perf/benchmark/bcm_benchmark.c b/perf/benchmark/bcm_benchmark.c index 801cd3a..5407d2b 100644 --- a/perf/benchmark/bcm_benchmark.c +++ b/perf/benchmark/bcm_benchmark.c @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-18 18:20:02 + * @LastEditTime: 2024-06-27 07:11:39 * @Email: [email protected] * @Describe: TODO */ @@ -101,8 +101,8 @@ int main(int argc, char *argv[]) { burst_cnt = 1; } } else { - config = "/root/code/c/bbq-ly/perf/benchmark/config/compare/case1_simple_spsc.ini"; - ring_type = "bbq"; + config = "/root/code/c/bbq/perf/benchmark/config/compare/case1_simple_spsc.ini"; + ring_type = "dpdk"; burst_cnt = 16; TEST_ERR_LOG("use default config, ringt_type:%s burst:%u config:%s argc:%d", ring_type, burst_cnt, config, argc); } diff --git a/perf/benchmark/bcm_queue.c b/perf/benchmark/bcm_queue.c index 5cbcea7..70f612b 100644 --- a/perf/benchmark/bcm_queue.c +++ b/perf/benchmark/bcm_queue.c @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-25 14:14:40 + * @LastEditTime: 2024-06-27 07:12:44 * @Email: [email protected] * @Describe: TODO */ @@ -10,11 +10,16 @@ static __rte_always_inline unsigned int bcm_dpdk_ring_enqueue_burst(struct rte_ring *r, void **obj_table, uint32_t n, uint16_t thread_idx, uint32_t *wait_consumed) { TEST_AVOID_WARNING(thread_idx); - unsigned int free_space = 0; int ret = 0; - ret = rte_ring_enqueue_burst(r, (void *const *)obj_table, n, &free_space); - *wait_consumed = r->size - free_space - 1; + if (wait_consumed) { + unsigned int free_space = 0; + ret = rte_ring_enqueue_burst(r, (void *const *)obj_table, n, &free_space); + *wait_consumed = r->size - free_space - 1; + } else { + ret = rte_ring_enqueue_burst(r, (void *const *)obj_table, n, NULL); + } + return ret; } |
