From e5ddd34ebba3a789fd4044085622710c30842c82 Mon Sep 17 00:00:00 2001 From: 刘煜 Date: Mon, 24 Jun 2024 02:22:35 +0000 Subject: 调整结构体和完善api接口 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bbq/include/bbq.h | 265 ++++++++++------- bbq/src/bbq.c | 560 +++++++++++++++++++++-------------- bbq/test.c | 54 ---- bbq/tests/common/test_queue.c | 31 +- bbq/tests/common/test_queue.h | 14 +- bbq/tests/unittest/ut.h | 2 +- bbq/tests/unittest/ut_example.cc | 181 ++++++----- bbq/tests/unittest/ut_head_cursor.cc | 162 +++++----- bbq/tests/unittest/ut_mix.cc | 10 +- bbq/tests/unittest/ut_multit.cc | 6 +- perf/CMakeLists.txt | 4 +- perf/benchmark/bcm_benchmark.c | 8 +- perf/benchmark/bcm_queue.c | 24 +- 13 files changed, 720 insertions(+), 601 deletions(-) delete mode 100644 bbq/test.c diff --git a/bbq/include/bbq.h b/bbq/include/bbq.h index 0b7921c..56785fc 100644 --- a/bbq/include/bbq.h +++ b/bbq/include/bbq.h @@ -1,12 +1,11 @@ /* * @Author: liuyu@geedgenetworks.com - * @LastEditTime: 2024-06-18 14:16:54 + * @LastEditTime: 2024-06-24 10:17:39 * @Describe: bbq(Block-based Bounded Queue)头文件 * 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf */ -#ifndef _BBQ_H_ -#define _BBQ_H_ +#pragma once #include #include @@ -25,9 +24,11 @@ using bbq_head = std::atomic; using aotmic_uint64 = std::atomic; #endif +#define BBQ_SOCKET_ID_ANY -1 #define __BBQ_CACHE_ALIGNED __attribute__((__aligned__(64))) +#define BBQ_SYMBOL_MAX 64 -struct bbq_block_s { +struct bbq_block { bbq_cursor committed; // 已提交(version|offset) bbq_cursor allocated; // 已分配(version|offset) bbq_cursor reserved; // 已预留(version|offset) @@ -35,165 +36,180 @@ struct bbq_block_s { char *entries; // 存储大小可变的entry,分配空间大小:bs * entry_size } __BBQ_CACHE_ALIGNED; -struct bbq_s { - size_t bs; // 每个block里entries成员的大小 - size_t bn; // 块(blocks)的个数 - size_t obj_size; // 入队数据的实际大小,int int* int**,都为sizeof(int) - size_t entry_size; // blocks.entries里每个entry的大小 - - int32_t socket_id; // 在哪个socket_id上使用libnuma分配内存,小于0将使用malloc分配 - uint32_t flags; // 标记:retry new 模式,还是drop old模式 - uint32_t idx_bits; // bbq_head里idx所占的位数 - uint32_t off_bits; // bbq_cursor里offset所占的位数 - - uint64_t idx_mask; // idx_bits偏移后的掩码 - uint64_t off_mask; // off_bits偏移后的掩码 - bbq_head phead; // 生产者头,指向块的索引,分为两部分:version|idx - bbq_head chead; // 消费者头,指向块的索引,分为两部分:version|idx - - struct bbq_block_s *blocks; // bn大小的数组 +struct bbq { + char name[BBQ_SYMBOL_MAX] __BBQ_CACHE_ALIGNED; + int32_t socket_id; // 用于libnuma分配内存,socket_id小于0将使用malloc分配 + uint32_t bn; // blocks的个数 + uint32_t bs; // blocks.entries的个数 + uint32_t flags; // 标记:retry new 模式,还是drop old模式 + uint32_t idx_bits; // bbq_head里idx所占的位数 + uint32_t off_bits; // bbq_cursor里offset所占的位数 + uint64_t idx_mask; // idx_bits偏移后的掩码 + uint64_t off_mask; // off_bits偏移后的掩码 + uint64_t entry_size; // blocks.entries里每个entry的大小 + bbq_head phead; // 生产者头,指向块的索引,分为两部分:version|idx + bbq_head chead; // 消费者头,指向块的索引,分为两部分:version|idx + struct bbq_block *blocks; // bn大小的数组 }; -// 创建队列时flags可选参数 -// 第一位控制入队策略,默认是retry new -#define BBQ_CREATE_F_DROP_OLD 0x0001 /**< 创建队列时设置为drop old模式(队列满时,覆盖旧数据,入队成功) */ -#define BBQ_CREATE_F_RETRY_NEW 0x0 /**< 默认为retry new模式(队列满时,当前入队失败) */ - -// 第二位控制入队时的数据拷贝策略,默认是copy pointer -#define BBQ_CREATE_F_COPY_VALUE 0x0002 /**< 创建队列时设置为拷贝数值 */ -#define BBQ_CREATE_F_COPY_PTR 0x0 /**< 默认为拷贝指针 */ +#define BBQ_F_DROP_OLD 0x0002 /**< 创建队列时设置为drop old模式(队列满时,入队成功并覆盖旧数据) */ +// #define BBQ_F_SP_ENQ 0x0004 /**< 创建队列时设置为单生产者 */ +// #define BBQ_F_SC_DEQ 0x0008 /**< 创建队列时设置为单消费者 */ +#define BBQ_F_DEFAULT 0x0 +#define BBQ_F_RETRY_NEW BBQ_F_DEFAULT /**< 创建队列时设置为retry new模式(队列满时,当前入队失败) */ +#define BBQ_F_MP_ENQ BBQ_F_DEFAULT /**< 创建队列时设置为多生产者 */ +#define BBQ_F_MC_DEQ BBQ_F_DEFAULT /**< 创建队列时设置为多消费者 */ /** - * 创建并返回bbq队列结构指针 + * 创建bbq队列,使用当前函数创建的队列,后续操作会把指针入队。 + * 对应入队函数:bbq_enqueue、bbq_enqueue_burst + * 对应出队函数:bbq_dequeue、bbq_dequeue_burst * + * @param[in] name + * bbq名称 * @param[in] count - * 队列所有entry的个数(所有块下entry个数的总和),count必须大于1,且是2的N次方。 - * 函数将会根据count自动计算出合理的块个数,并将entry平均分配到每个块里。 - * 计算公式:log2(blocks) = max(1, ⌊log2(count)/4⌋) 注:⌊ ⌋代表向下取整。 - * @param[in] obj_size - * 入队数据的实际大小,int int* int**,都为sizeof(int),因为在burst的时候会传入数组, - * 需要根据数据大小来计算偏移,请正确设置该值。 - * @param[in] flags - * 设置入队策略,通过BBQ_CREATE_F_XX系列宏定义设置flags - * 1)第一位控制入队策略,默认是retry new模式(队列满了当前入队失败)。 - * 如果要设置为drop old模式,需要flags|BBQ_CREATE_F_DROP_OLD - * 2)第二位控制数据入队列时,将"指针"入队,还是将"指针指向的数据"入队。默认入队"指针", - * 如果把"指针指向的数据"入队,需要设置flags|BBQ_CREATE_F_COPY_VALUE - * 3)可以同时设置多个flag,入BBQ_CREATE_F_DROP_OLD|BBQ_CREATE_F_COPY_VALUE - * @return - * 非NULL:消息队列结构体指针,用于后续出队、入队等操作。 - * NULL:创建失败。 - */ -extern struct bbq_s *bbq_create(uint32_t count, size_t obj_size, uint32_t flags); - -/** - * 创建并返回bbq队列结构指针,支持多numa - * - * @param[in] count - * 队列所有entry的个数(所有块下entry个数的总和),count必须大于1,且是2的N次方。 - * 函数将会根据count自动计算出合理的块个数,并将entry平均分配到每个块里。 - * 计算公式:log2(blocks) = max(1, ⌊log2(count)/4⌋) 注:⌊ ⌋代表向下取整。 - * @param[in] flags - * 设置入队策略,通过BBQ_CREATE_F_XX系列宏定义设置flags - * 1)第一位控制入队策略,默认是retry new模式(队列满了当前入队失败)。 - * 如果要设置为drop old模式,需要flags|BBQ_CREATE_F_DROP_OLD - * 2)第二位控制数据入队列时,将"指针"入队,还是将"指针指向的数据"入队。默认入队"指针", - * 如果把"指针指向的数据"入队,需要设置flags|BBQ_CREATE_F_COPY_VALUE - * 3)可以同时设置多个flag,入BBQ_CREATE_F_DROP_OLD|BBQ_CREATE_F_COPY_VALUE - * @param[in] obj_size - * 入队数据的实际大小,int int* int**,都为sizeof(int),因为在burst的时候会传入数组, - * 需要根据数据大小来计算偏移,请正确设置该值。 + * 队列所有entry的个数,count必须大于1,且是2的N次方。 * @param[in] socket_id - * 多numa架构下,队列里的空间将针对指定socket调用libnuma库函数分配内存。 + * 多numa架构下,调用libnuma库函数针对指定socket分配内存。 * 当检测到不支持多numa,将转为malloc分配内存。 + * @param[in] flags + * 设置入队策略: + * 1)BBQ_F_RETRY_NEW(默认):队列满了当前入队失败。 + * 2)BBQ_F_DROP_OLD:队列满时,覆盖旧数据,入队成功 * @return * 非NULL:消息队列结构体指针,用于后续出队入队等操作。 * NULL:创建失败。 */ -extern struct bbq_s *bbq_create_with_socket(uint32_t count, size_t obj_size, int socket_id, uint32_t flags); +extern struct bbq *bbq_create(const char *name, uint32_t count, int socket_id, uint32_t flags); /** - * 用于释放消息队列,与bbq_create/bbq_create_with_socket函数成对。 + * 消息队列单个指针入队 * * @param[in] q * 队列指针 + * @param[in] data + * 则传入一维指针,如: + * int *data = malloc(sizeof(int));*data = 1; 传入&data + * @return + * 成功返回0,失败返回小于0的错误码。 */ -extern void bbq_ring_free(struct bbq_s *q); +extern int bbq_enqueue(struct bbq *q, void *const *data); /** - * 消息队列单个数据入队 + * 消息队列单个指针出队 * * @param[in] q * 队列指针 - * @param[in] data - * 创建队列时: - * 1)如果flag设置了BBQ_CREATE_F_COPY_VALUE,则传入一维指针,如 int data,如: - * int data,传入&data - * int *data = malloc(sizeof(int));*data = 1; 传入data - * 2)如果flag设置了BBQ_CREATE_F_COPY_PTR,则传入二维指针,如 int *data,如: - * int *data = malloc(sizeof(int));*data = 1; 传入&data + * @param[out] data + * 则传入二维指针,如: + * int *data = NULL; 传入&data * @return * 成功返回0,失败返回小于0的错误码。 */ -extern int bbq_enqueue(struct bbq_s *q, void *data); +extern int bbq_dequeue(struct bbq *q, void **data); /** - * 消息队列批量入队(指针入队),尽可能一次入队n个数据,返回实际成功入队个数 + * 消息队列批量指针入队,尽可能一次入队n个指针,返回实际成功入队个数 * * @param[in] q * 队列指针 * @param[in] obj_table - * 即将入队的指针数组,将数组里的每个成员(指针)入队 + * 即将入队的指针数组,如: + * uint16_t **obj_table = malloc(sizeof(uint16_t **) * BUF_CNT); + * for(int i=0;i #include -// -----------------------------日志宏定义------------------------------- +// flags第1位控制入队时的数据拷贝策略,默认是"拷贝指针" +#define BBQ_F_COPY_PTR 0x0 /**< 默认为拷贝指针 */ +#define BBQ_F_COPY_VALUE 0x0001 /**< 创建队列时设置为拷贝数值 */ + +// 判断flags标记位 +#define BBQ_F_CHK_DROP_OLD(flags) (flags & BBQ_F_DROP_OLD) +#define BBQ_F_CHK_VALUE(flags) (flags & BBQ_F_COPY_VALUE) + +// 避免无用参数的编译告警 +#define AVOID_WARNING(param) ((void)param) + #define BBQ_ERR_LOG(fmt, ...) \ do { \ printf("\x1b[31m [ERR][%s:%d:%s]" fmt "\x1b[0m\n", __func__, __LINE__, __FILE__, ##__VA_ARGS__); \ } while (0) -// -----------------------------其他宏定义------------------------------- -#define AVOID_WARNING(param) ((void)param) -#define BBQ_INVALID_SOCKET -1 - -#define BBQ_POLIC_DROP_OLD(flags) (flags & BBQ_F_POLICY_DROP_OLD) -#define BBQ_POLIC_RETRY_NEW(flags) (!(flags & BBQ_CREATE_F_DROP_OLD)) - -#define BBQ_COPY_VALUE(flags) (flags & BBQ_F_COPY_VALUE) -#define BBQ_COPY_POINTER(flags) (!(flags & BBQ_CREATE_F_COPY_VALUE)) - -// -----------------------------用于内存测试------------------------------- -// 当BBQ_MEMORY宏定义开关打开,将对内存分配释放进行统计,方便排查内存泄漏 -// #define BBQ_MEMORY -enum bbq_module_e { - BBQ_MODULE_QUEUE = 0, - BBQ_MODULE_QUEUE_BLOCK_NB, - BBQ_MODULE_QUEUE_BLOCK_ENTRY, - BBQ_MODULE_MAX, -}; - struct bbq_status { int32_t status; // 返回状态 uint32_t actual_burst; // 实际出/入队个数 }; -enum bbq_queue_state_e { +enum bbq_queue_state { BBQ_SUCCESS = 0, - BBQ_BLOCK_DONE, // 当前块已的entry已用完,需要移动到下一个块 - BBQ_NO_ENTRY, // 没有条目可以使用 + BBQ_BLOCK_DONE, // 当前块的entry已用完,需要移动到下一个块 + BBQ_NO_ENTRY, // 队列里没有entry可以使用了 BBQ_NOT_AVAILABLE, // 当前块不可以用状态(将返回busy) BBQ_ALLOCATED, // 已分配,返回entry信息 BBQ_RESERVED, // 已保留,返回entry信息 }; -struct bbq_entry_desc_s { - uint64_t off; // entry在当前block的偏移(offset) - uint64_t vsn; // allocated游标的版本(vsn) - uint32_t actual_burst; // 实际出入队个数 - struct bbq_block_s *block; // 指向所在的block +struct bbq_entry_desc { + uint64_t vsn; // allocated游标的版本(vsn) TODO:修正注释 + uint64_t off; // entry在当前block的偏移(offset) + uint32_t actual_burst; // 实际出/入队个数 + struct bbq_block *block; // 指向所在的block }; struct bbq_queue_state_s { - enum bbq_queue_state_e state; // 队列状态 - union { - uint64_t vsn; // reserve_entry state==BLOCK_DONE时生效 - struct bbq_entry_desc_s e; // state为ALLOCATED、RESERVED生效 + enum bbq_queue_state state; // 队列状态 + union { // TODO: + uint64_t vsn; // reserve_entry state==BLOCK_DONE时生效 + struct bbq_entry_desc e; // state为ALLOCATED、RESERVED生效 }; }; -extern inline uint64_t bbq_idx(struct bbq_s *q, uint64_t x) { +extern inline uint64_t bbq_idx(struct bbq *q, uint64_t x) { return x & q->idx_mask; } -extern inline uint64_t bbq_off(struct bbq_s *q, uint64_t x) { +extern inline uint64_t bbq_off(struct bbq *q, uint64_t x) { return x & q->off_mask; } -extern inline uint64_t bbq_head_vsn(struct bbq_s *q, uint64_t x) { +extern inline uint64_t bbq_head_vsn(struct bbq *q, uint64_t x) { return x >> q->idx_bits; } -extern inline uint64_t bbq_cur_vsn(struct bbq_s *q, uint64_t x) { +extern inline uint64_t bbq_cur_vsn(struct bbq *q, uint64_t x) { return x >> q->off_bits; } -extern inline uint64_t set_cur_vsn(struct bbq_s *q, uint64_t ver) { +extern inline uint64_t set_cur_vsn(struct bbq *q, uint64_t ver) { return ver << q->off_bits; } -#ifdef BBQ_MEMORY -#define BBQ_MEM_MAGIC 0xFF -#endif +// 当BBQ_MEMORY宏定义开关打开,将对内存分配释放进行统计,方便排查内存泄漏 +// #define BBQ_MEMORY +enum bbq_module { + BBQ_MODULE_QUEUE = 0, + BBQ_MODULE_QUEUE_BLOCK_NB, + BBQ_MODULE_QUEUE_BLOCK_ENTRY, + BBQ_MODULE_MAX, +}; #ifdef BBQ_MEMORY +#define BBQ_MEM_MAGIC 0xFF struct bbq_memory_s { aotmic_uint64 malloc_cnt; aotmic_uint64 malloc_size; @@ -100,7 +96,7 @@ struct bbq_memory_s { struct bbq_memory_s bbq_memory_g[BBQ_MODULE_MAX] = {0}; #endif -void *bbq_malloc(enum bbq_module_e module, int socket_id, size_t size) { +static void *bbq_malloc(enum bbq_module module, int socket_id, size_t size) { void *ptr = NULL; if (socket_id >= 0) { ptr = numa_alloc_onnode(size, 0); @@ -118,7 +114,7 @@ void *bbq_malloc(enum bbq_module_e module, int socket_id, size_t size) { return ptr; } -void bbq_free(enum bbq_module_e module, int socket_id, void *ptr, size_t size) { +static void bbq_free(enum bbq_module module, int socket_id, void *ptr, size_t size) { if (socket_id >= 0) { numa_free(ptr, size); } else { @@ -137,9 +133,9 @@ void bbq_free(enum bbq_module_e module, int socket_id, void *ptr, size_t size) { /* 原子的比较两个值大小,并设置较大的值,成功则返回设置前的旧值 */ uint64_t fetch_max(aotmic_uint64 *atom, uint64_t upd) { - uint64_t old_value; + uint64_t old_value = 0; do { - old_value = atomic_load(atom); // 读取当前值 + old_value = atomic_load(atom); } while (old_value < upd && !atomic_compare_exchange_weak(atom, &old_value, upd)); return old_value; @@ -165,7 +161,7 @@ uint32_t bbq_blocks_calc(uint32_t entries) { } /* 块初始化 */ -int block_init(struct bbq_s *q, struct bbq_block_s *block, bool cursor_init) { +int block_init(struct bbq *q, struct bbq_block *block, bool cursor_init) { #ifdef BBQ_MEMORY // 末尾多分配一个entry(永远不应该被修改),以此检查是否存在写越界的问题 block->entries = bbq_malloc(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id, @@ -192,10 +188,10 @@ int block_init(struct bbq_s *q, struct bbq_block_s *block, bool cursor_init) { block->committed = ATOMIC_VAR_INIT(q->bs); block->allocated = ATOMIC_VAR_INIT(q->bs); block->reserved = ATOMIC_VAR_INIT(q->bs); - if (BBQ_POLIC_RETRY_NEW(q->flags)) { - block->consumed = ATOMIC_VAR_INIT(q->bs); - } else { + if (BBQ_F_CHK_DROP_OLD(q->flags)) { block->consumed = ATOMIC_VAR_INIT(0); + } else { + block->consumed = ATOMIC_VAR_INIT(q->bs); } } @@ -203,7 +199,7 @@ int block_init(struct bbq_s *q, struct bbq_block_s *block, bool cursor_init) { } /* 块清理函数,与block_init成对*/ -void block_cleanup(struct bbq_s *q, struct bbq_block_s *block) { +void block_destory(struct bbq *q, struct bbq_block *block) { if (block->entries) { #ifdef BBQ_MEMORY bbq_free(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id, @@ -235,8 +231,8 @@ unsigned ceil_log2(uint64_t x) { return x == 1 ? 0 : floor_log2(x - 1) + 1; } -/* 创建消息队列,bn和bs必须是2的N次幂,socket_id用于多numa分配内存,free_func先设置NULL */ -struct bbq_s *bbq_ring_create_bnbs_with_socket(uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags) { +/* 创建消息队列,bn和bs必须是2的N次幂,socket_id用于多numa分配内存 */ +static struct bbq *__bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags) { int ret = 0; if (bbq_check_power_of_two(bn) == false) { @@ -254,29 +250,29 @@ struct bbq_s *bbq_ring_create_bnbs_with_socket(uint32_t bn, uint32_t bs, size_t return NULL; } + if (name == NULL || strlen(name) >= BBQ_SYMBOL_MAX - 1) { + BBQ_ERR_LOG("invalid bbq name, max len is %d", BBQ_SYMBOL_MAX - 1); + return NULL; + } + if (numa_available() < 0) { // 不支持numa,设置 - socket_id = BBQ_INVALID_SOCKET; + socket_id = BBQ_SOCKET_ID_ANY; } - struct bbq_s *q = bbq_malloc(BBQ_MODULE_QUEUE, socket_id, sizeof(*q)); + struct bbq *q = bbq_malloc(BBQ_MODULE_QUEUE, socket_id, sizeof(*q)); if (q == NULL) { BBQ_ERR_LOG("malloc for bbq queue error"); return NULL; } memset(q, 0, sizeof(*q)); - + ret = snprintf(q->name, sizeof(q->name), "%s", name); q->bn = bn; q->bs = bs; - q->obj_size = obj_size; - if (BBQ_COPY_POINTER(flags)) { - q->entry_size = sizeof(void *); - } else { - q->entry_size = obj_size; - } + q->entry_size = obj_size; q->socket_id = socket_id; - q->phead = 0; - q->chead = 0; + q->phead = ATOMIC_VAR_INIT(0); + q->chead = ATOMIC_VAR_INIT(0); q->flags = flags; q->blocks = bbq_malloc(BBQ_MODULE_QUEUE_BLOCK_NB, socket_id, bn * sizeof(*q->blocks)); @@ -287,9 +283,9 @@ struct bbq_s *bbq_ring_create_bnbs_with_socket(uint32_t bn, uint32_t bs, size_t memset(q->blocks, 0, sizeof(*q->blocks)); for (uint32_t i = 0; i < bn; ++i) { - // 第一个block不需要设置cursor_init_flag - bool cursor_init_flag = (i == 0 ? false : true); - ret = block_init(q, &(q->blocks[i]), cursor_init_flag); + // 第一个block不需要设置cursor_init + bool cursor_init = (i == 0 ? false : true); + ret = block_init(q, &(q->blocks[i]), cursor_init); if (ret != BBQ_OK) { BBQ_ERR_LOG("bbq block init error"); goto error; @@ -297,7 +293,7 @@ struct bbq_s *bbq_ring_create_bnbs_with_socket(uint32_t bn, uint32_t bs, size_t } q->idx_bits = ceil_log2(bn); - q->off_bits = ceil_log2(bs) + 1; + q->off_bits = ceil_log2(bs) + 1; // 多线程同时add,可能超过bs的问题,因此多分配一位 q->idx_mask = (1 << q->idx_bits) - 1; q->off_mask = (1 << q->off_bits) - 1; @@ -305,17 +301,20 @@ struct bbq_s *bbq_ring_create_bnbs_with_socket(uint32_t bn, uint32_t bs, size_t return q; error: - bbq_ring_free(q); + bbq_destory(q); return NULL; } -/* 创建消息队列,bn和bs必须是2的N次幂,free_func先设置NULL */ -struct bbq_s *bbq_ring_create_bnbs(uint32_t bn, uint32_t bs, size_t obj_size, unsigned int flags) { - return bbq_ring_create_bnbs_with_socket(bn, bs, obj_size, BBQ_INVALID_SOCKET, flags); +struct bbq *bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs, int socket_id, uint32_t flags) { + return __bbq_create_bnbs(name, bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR); +} + +struct bbq *bbq_create_bnbs_elem(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags) { + return __bbq_create_bnbs(name, bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE); } /* 创建消息队列,count必须大于1,且是2的N次幂,bn和bs将根据count值自动计算,socket_id用于多numa分配内存,free_func先设置NULL */ -struct bbq_s *bbq_create_with_socket(uint32_t count, size_t obj_size, int socket_id, unsigned int flags) { +struct bbq *bbq_create_elem(const char *name, uint32_t count, size_t obj_size, int socket_id, uint32_t flags) { if (bbq_check_power_of_two(count) == false || count == 1) { BBQ_ERR_LOG("bbq entries number must be power of two and greater than 1, now is :%u", count); return NULL; @@ -323,74 +322,72 @@ struct bbq_s *bbq_create_with_socket(uint32_t count, size_t obj_size, int socket uint32_t bn = bbq_blocks_calc(count); uint32_t bs = count / bn; - return bbq_ring_create_bnbs_with_socket(bn, bs, obj_size, socket_id, flags); + + return bbq_create_bnbs_elem(name, bn, bs, obj_size, socket_id, flags); } -/* 创建消息队列,count必须大于1,且是2的N次幂,bn和bs将根据count值自动计算,free_func先设置NULL */ -struct bbq_s *bbq_create(uint32_t count, size_t obj_size, unsigned int flags) { - // 传入无效socket_id,将使用malloc分配内存 - return bbq_create_with_socket(count, obj_size, BBQ_INVALID_SOCKET, flags); +struct bbq *bbq_create(const char *name, uint32_t count, int socket_id, uint32_t flags) { + if (bbq_check_power_of_two(count) == false || count == 1) { + BBQ_ERR_LOG("bbq entries number must be power of two and greater than 1, now is :%u", count); + return NULL; + } + + uint32_t bn = bbq_blocks_calc(count); + uint32_t bs = count / bn; + + return bbq_create_bnbs(name, bn, bs, socket_id, flags); } /* 释放消息队列,与bbq_ring_create系列接口成对*/ -void bbq_ring_free(struct bbq_s *q) { +void bbq_destory(struct bbq *q) { if (q == NULL) { return; } for (uint32_t i = 0; i < q->bn; ++i) { - block_cleanup(q, &(q->blocks[i])); + block_destory(q, &(q->blocks[i])); } bbq_free(BBQ_MODULE_QUEUE_BLOCK_NB, q->socket_id, q->blocks, q->bn * sizeof(*q->blocks)); bbq_free(BBQ_MODULE_QUEUE, q->socket_id, q, sizeof(*q)); } -// flags 第一位控制传入的是一维数组还是二维数组 -#define BBQ_F_SINGLE 0x0 -#define BBQ_F_ARRAY_1D 0x1 -#define BBQ_F_ARRAY_2D 0x2 -void commit_entry(struct bbq_s *q, struct bbq_entry_desc_s *e, void const *data, uint32_t flag) { +#define BBQ_DATA_TYPE_SINGLE 0x0 +#define BBQ_DATA_TYPE_ARRAY_1D 0x1 +#define BBQ_DATA_TYPE_ARRAY_2D 0x2 +void commit_entry(struct bbq *q, struct bbq_entry_desc *e, void const *data, uint32_t data_type) { size_t idx = e->off * q->entry_size; - if (BBQ_COPY_POINTER(q->flags)) { - // 指针入队列 - switch (flag) { - case BBQ_F_ARRAY_1D: { - char *tmp = (char *)data; + if (BBQ_F_CHK_VALUE(q->flags)) { + // 数据入队列 + switch (data_type) { + case BBQ_DATA_TYPE_ARRAY_1D: + case BBQ_DATA_TYPE_SINGLE: + memcpy(&(e->block->entries[idx]), data, q->entry_size * e->actual_burst); + break; + case BBQ_DATA_TYPE_ARRAY_2D: { + void **tmp = (void **)data; char *entry = &(e->block->entries[idx]); for (size_t i = 0; i < e->actual_burst; i++) { - memcpy(entry, &tmp, q->entry_size); + memcpy(entry, *tmp, q->entry_size); entry += q->entry_size; - tmp += q->obj_size; + tmp++; } break; } - case BBQ_F_ARRAY_2D: - case BBQ_F_SINGLE: - // 二维数组名等于首成员的地址,这里data其实是void **data, &data等同于 &(data[0]) - memcpy(&(e->block->entries[idx]), data, q->entry_size * e->actual_burst); - break; default: break; } } else { - // 数据入队列 - switch (flag) { - case BBQ_F_ARRAY_1D: - case BBQ_F_SINGLE: + // 指针入队列 + switch (data_type) { + case BBQ_DATA_TYPE_ARRAY_2D: + case BBQ_DATA_TYPE_SINGLE: + // 二维数组名等于首成员的地址,这里data其实是void **data, &data等同于 &(data[0]) memcpy(&(e->block->entries[idx]), data, q->entry_size * e->actual_burst); break; - case BBQ_F_ARRAY_2D: { - void **tmp = (void **)data; - char *entry = &(e->block->entries[idx]); - for (size_t i = 0; i < e->actual_burst; i++) { - memcpy(entry, *tmp, q->entry_size); - entry += q->entry_size; - tmp++; - } + case BBQ_DATA_TYPE_ARRAY_1D: break; - } default: break; } @@ -398,7 +395,7 @@ void commit_entry(struct bbq_s *q, struct bbq_entry_desc_s *e, void const *data, atomic_fetch_add(&e->block->committed, e->actual_burst); } -struct bbq_queue_state_s allocate_entry(struct bbq_s *q, struct bbq_block_s *block, uint32_t n) { +struct bbq_queue_state_s allocate_entry(struct bbq *q, struct bbq_block *block, uint32_t n) { struct bbq_queue_state_s state = {0}; if (bbq_off(q, atomic_load(&block->allocated)) >= q->bs) { state.state = BBQ_BLOCK_DONE; @@ -431,13 +428,19 @@ struct bbq_queue_state_s allocate_entry(struct bbq_s *q, struct bbq_block_s *blo return state; } -enum bbq_queue_state_e advance_phead(struct bbq_s *q, uint64_t ph) { +enum bbq_queue_state advance_phead(struct bbq *q, uint64_t ph) { // 获取下一个block uint64_t cur = 0; - struct bbq_block_s *n_blk = &(q->blocks[(bbq_idx(q, ph) + 1) & q->idx_mask]); + struct bbq_block *n_blk = &(q->blocks[(bbq_idx(q, ph) + 1) & q->idx_mask]); uint64_t ph_vsn = bbq_head_vsn(q, ph); - if (BBQ_POLIC_RETRY_NEW(q->flags)) { + if (BBQ_F_CHK_DROP_OLD(q->flags)) { + cur = atomic_load(&n_blk->committed); + // 生产者避免前进到上一轮中尚未完全提交的区块 + if (bbq_cur_vsn(q, cur) == ph_vsn && bbq_off(q, cur) != q->bs) { + return BBQ_NOT_AVAILABLE; + } + } else { cur = atomic_load(&n_blk->consumed); uint64_t reserved; uint64_t consumed_off = bbq_off(q, cur); @@ -452,12 +455,6 @@ enum bbq_queue_state_e advance_phead(struct bbq_s *q, uint64_t ph) { return BBQ_NOT_AVAILABLE; } } - } else { - cur = atomic_load(&n_blk->committed); - // 生产者避免前进到上一轮中尚未完全提交的区块 - if (bbq_cur_vsn(q, cur) == ph_vsn && bbq_off(q, cur) != q->bs) { - return BBQ_NOT_AVAILABLE; - } } // 用head的version初始化下一个块,version在高位,version+1,idex/offset清零,如果没有被其他线程执行过,数值会高于旧值。多线程同时只更新一次。 @@ -470,8 +467,53 @@ enum bbq_queue_state_e advance_phead(struct bbq_s *q, uint64_t ph) { return BBQ_SUCCESS; } +static uint32_t bbq_wait_consumed_set(struct bbq *q, uint64_t *ch_ptr, uint64_t *ph_ptr, struct bbq_block *blk_ph) { + uint64_t ch; + uint64_t ph; + if (ch_ptr != NULL) { + ch = *ch_ptr; + } else { + ch = atomic_load(&q->chead); + } + + if (ph_ptr != NULL) { + ph = *ph_ptr; + } else { + ph = atomic_load(&q->phead); + } + + uint64_t ph_idx = bbq_idx(q, ph); + uint64_t ch_idx = bbq_idx(q, ch); + uint64_t committed_off = bbq_off(q, atomic_load(&blk_ph->committed)); + + struct bbq_block *blk_ch = &(q->blocks[bbq_idx(q, ch)]); + uint64_t reserved_off = bbq_off(q, atomic_load(&blk_ch->reserved)); + + // "生产者"超过"消费者"的索引(即块)的个数 + uint64_t idx_diff = ph_idx >= ch_idx ? ph_idx - ch_idx : q->bn - ch_idx + ph_idx; + if (!BBQ_F_CHK_DROP_OLD(q->flags)) { + // 这里idx_diff-1=-1也是正确。 + return (idx_diff - 1) * q->bs + (q->bs - reserved_off + committed_off); + } + + uint64_t ch_vsn = bbq_head_vsn(q, ch); + uint64_t ph_vsn = bbq_head_vsn(q, ph); + + if (ph_vsn == ch_vsn || (ph_vsn == (ch_vsn + 1) && (ph_idx < ch_idx))) { + return (idx_diff - 1) * q->bs + (q->bs - reserved_off + committed_off); + } + + // 发生了覆盖 + if (ph_idx == ch_idx) { + // 当前块以及之前已生产的都作废 + return 0; + } + + return (idx_diff - 1) * q->bs + committed_off; +} + /* 消息队列入队 */ -static struct bbq_status __bbq_enqueue(struct bbq_s *q, void const *data, uint32_t n, uint32_t flag) { +static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t n, uint32_t flag, uint32_t *wait_consumed) { struct bbq_status ret = {.status = 0, .actual_burst = 0}; if (q == NULL || data == NULL) { @@ -482,7 +524,7 @@ static struct bbq_status __bbq_enqueue(struct bbq_s *q, void const *data, uint32 while (true) { // 获取当前phead,转为索引后获取到当前的blk uint64_t ph = atomic_load(&q->phead); - struct bbq_block_s *blk = &(q->blocks[bbq_idx(q, ph)]); + struct bbq_block *blk = &(q->blocks[bbq_idx(q, ph)]); struct bbq_queue_state_s state = allocate_entry(q, blk, n); switch (state.state) { @@ -490,38 +532,49 @@ static struct bbq_status __bbq_enqueue(struct bbq_s *q, void const *data, uint32 commit_entry(q, &state.e, data, flag); ret.actual_burst = state.e.actual_burst; ret.status = BBQ_OK; - return ret; + break; case BBQ_BLOCK_DONE: { - enum bbq_queue_state_e pstate = advance_phead(q, ph); - switch (pstate) { - case BBQ_NO_ENTRY: + enum bbq_queue_state pstate = advance_phead(q, ph); + if (pstate == BBQ_SUCCESS) { + continue; + } + + if (pstate == BBQ_NO_ENTRY) { ret.status = BBQ_QUEUE_FULL; - return ret; - case BBQ_NOT_AVAILABLE: + } else if (pstate == BBQ_NOT_AVAILABLE) { ret.status = BBQ_QUEUE_BUSY; - return ret; - case BBQ_SUCCESS: - continue; - default: + } else { ret.status = BBQ_ERROR; - return ret; } + break; } default: ret.status = BBQ_ERROR; - return ret; + break; } + + if (wait_consumed != NULL) { + *wait_consumed = bbq_wait_consumed_set(q, NULL, &ph, blk); + } + + return ret; } } -int bbq_enqueue(struct bbq_s *q, void *data) { - struct bbq_status ret = __bbq_enqueue(q, data, 1, BBQ_F_SINGLE); +int bbq_enqueue(struct bbq *q, void *const *data) { + struct bbq_status ret = __bbq_enqueue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL); + return ret.status; +} + +int bbq_enqueue_elem(struct bbq *q, void const *data) { + struct bbq_status ret = __bbq_enqueue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL); return ret.status; } /* 更新成功 reserve成功的个数 */ uint32_t reserve_update(bbq_cursor *aotmic, uint64_t reserved, uint32_t n) { + // TODO:逻辑可以合并 if (n == 1) { // fetch_max返回的是旧值,如果旧值等于局部变量reserved,则代表数据由当前线程更新 if (fetch_max(aotmic, reserved + 1) == reserved) { @@ -535,7 +588,7 @@ uint32_t reserve_update(bbq_cursor *aotmic, uint64_t reserved, uint32_t n) { } } -struct bbq_queue_state_s reserve_entry(struct bbq_s *q, struct bbq_block_s *block, uint32_t n) { +struct bbq_queue_state_s reserve_entry(struct bbq *q, struct bbq_block *block, uint32_t n) { while (true) { struct bbq_queue_state_s state; uint64_t reserved = atomic_load(&block->reserved); @@ -544,7 +597,9 @@ struct bbq_queue_state_s reserve_entry(struct bbq_s *q, struct bbq_block_s *bloc if (reserved_off < q->bs) { uint64_t consumed = atomic_load(&block->consumed); - if (BBQ_POLIC_RETRY_NEW(q->flags) && reserved_svn != bbq_cur_vsn(q, consumed)) { + // TODO:bug?? ver溢出了,在drop old模式下使用了vsn,应该传入consumed的vsn合理? + // TODO:这个情况可能出现? + if (!BBQ_F_CHK_DROP_OLD(q->flags) && reserved_svn != bbq_cur_vsn(q, consumed)) { // consumed溢出了,这种情况只发生在BBQ_RETRY_NEW,因为BBQ_DROP_OLD模式,consumed没有用到 state.state = BBQ_BLOCK_DONE; state.vsn = reserved_svn; @@ -553,7 +608,7 @@ struct bbq_queue_state_s reserve_entry(struct bbq_s *q, struct bbq_block_s *bloc uint64_t committed = atomic_load(&block->committed); uint64_t committed_off = bbq_off(q, committed); - if (committed_off == reserved_off) { // TODO:多entry关注 + if (committed_off == reserved_off) { state.state = BBQ_NO_ENTRY; return state; } @@ -589,26 +644,16 @@ struct bbq_queue_state_s reserve_entry(struct bbq_s *q, struct bbq_block_s *bloc } } -bool consume_entry(struct bbq_s *q, struct bbq_entry_desc_s *e, void *deq_data, uint32_t flag) { +bool consume_entry(struct bbq *q, struct bbq_entry_desc *e, void *deq_data, uint32_t data_type) { size_t idx = e->off * q->entry_size; - if (BBQ_COPY_POINTER(q->flags)) { - switch (flag) { - case BBQ_F_ARRAY_2D: - case BBQ_F_SINGLE: - memcpy(deq_data, &(e->block->entries[idx]), q->entry_size * e->actual_burst); - break; - case BBQ_F_ARRAY_1D: - default: - break; - } - } else { - switch (flag) { - case BBQ_F_ARRAY_1D: - case BBQ_F_SINGLE: + if (BBQ_F_CHK_VALUE(q->flags)) { + switch (data_type) { + case BBQ_DATA_TYPE_ARRAY_1D: + case BBQ_DATA_TYPE_SINGLE: memcpy(deq_data, &(e->block->entries[idx]), q->entry_size * e->actual_burst); break; - case BBQ_F_ARRAY_2D: { + case BBQ_DATA_TYPE_ARRAY_2D: { void **tmp = (void **)deq_data; char *entry = &(e->block->entries[idx]); for (size_t i = 0; i < e->actual_burst; i++) { @@ -621,31 +666,47 @@ bool consume_entry(struct bbq_s *q, struct bbq_entry_desc_s *e, void *deq_data, default: break; } + } else { + switch (data_type) { + case BBQ_DATA_TYPE_ARRAY_2D: + case BBQ_DATA_TYPE_SINGLE: + memcpy(deq_data, &(e->block->entries[idx]), q->entry_size * e->actual_burst); + break; + case BBQ_DATA_TYPE_ARRAY_1D: + default: + break; + } } uint64_t allocated; - if (BBQ_POLIC_RETRY_NEW(q->flags)) { - atomic_fetch_add(&e->block->consumed, e->actual_burst); - } else { + if (BBQ_F_CHK_DROP_OLD(q->flags)) { // TODO:优化,考虑allocated vsn溢出?考虑判断如果生产满了,直接移动head? allocated = atomic_load(&e->block->allocated); // 预留的entry所在的块,已经被新生产的数据赶上了 if (bbq_cur_vsn(q, allocated) != e->vsn) { return false; } + } else { + atomic_fetch_add(&e->block->consumed, e->actual_burst); } return true; } -bool advance_chead(struct bbq_s *q, uint64_t ch, uint64_t ver) { +bool advance_chead(struct bbq *q, uint64_t ch, uint64_t ver) { uint64_t ch_idx = bbq_idx(q, ch); - struct bbq_block_s *n_blk = &(q->blocks[(ch_idx + 1) & q->idx_mask]); + struct bbq_block *n_blk = &(q->blocks[(ch_idx + 1) & q->idx_mask]); uint64_t ch_vsn = bbq_head_vsn(q, ch); uint64_t committed = atomic_load(&n_blk->committed); uint64_t committed_vsn = bbq_cur_vsn(q, committed); - if (BBQ_POLIC_RETRY_NEW(q->flags)) { + if (BBQ_F_CHK_DROP_OLD(q->flags)) { + // 通过检查下一个块的版本是否大于或等于当前块来保证 FIFO 顺序. + // 第一个块是一个特殊情况,因为与其他块相比,它的版本总是相差一个。因此,如果 ch_idx == 0,我们在比较中加 1 + if (committed_vsn < ver + (ch_idx == 0)) + return false; + fetch_max(&n_blk->reserved, set_cur_vsn(q, committed_vsn)); + } else { if (committed_vsn != ch_vsn + 1) { // 消费者追上了生产者,下一块还未开始生产 return false; @@ -653,12 +714,6 @@ bool advance_chead(struct bbq_s *q, uint64_t ch, uint64_t ver) { uint64_t new_vsn = set_cur_vsn(q, ch_vsn + 1); fetch_max(&n_blk->consumed, new_vsn); fetch_max(&n_blk->reserved, new_vsn); - } else { - // 通过检查下一个块的版本是否大于或等于当前块来保证 FIFO 顺序. - // 第一个块是一个特殊情况,因为与其他块相比,它的版本总是相差一个。因此,如果 ch_idx == 0,我们在比较中加 1 - if (committed_vsn < ver + (ch_idx == 0)) - return false; - fetch_max(&n_blk->reserved, set_cur_vsn(q, committed_vsn)); } fetch_max(&q->chead, ch + 1); @@ -666,7 +721,7 @@ bool advance_chead(struct bbq_s *q, uint64_t ch, uint64_t ver) { } /* 消息队列出队 */ -static struct bbq_status __bbq_dequeue(struct bbq_s *q, void *deq_data, uint32_t n, uint32_t flag) { +static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n, uint32_t data_type, uint32_t *wait_consumed) { struct bbq_status ret = {.status = 0, .actual_burst = 0}; if (q == NULL || deq_data == NULL) { ret.status = BBQ_NULL_PTR; @@ -675,46 +730,54 @@ static struct bbq_status __bbq_dequeue(struct bbq_s *q, void *deq_data, uint32_t while (true) { uint64_t ch = atomic_load(&q->chead); - struct bbq_block_s *blk = &(q->blocks[bbq_idx(q, ch)]); - + struct bbq_block *blk = &(q->blocks[bbq_idx(q, ch)]); struct bbq_queue_state_s state; state = reserve_entry(q, blk, n); switch (state.state) { case BBQ_RESERVED: - if (consume_entry(q, &state.e, deq_data, flag)) { - ret.status = BBQ_OK; - ret.actual_burst = state.e.actual_burst; - return ret; - } else { + if (!consume_entry(q, &state.e, deq_data, data_type)) { continue; } + ret.status = BBQ_OK; + ret.actual_burst = state.e.actual_burst; + break; case BBQ_NO_ENTRY: ret.status = BBQ_QUEUE_EMPTY; - return ret; + break; case BBQ_NOT_AVAILABLE: ret.status = BBQ_QUEUE_BUSY; - return ret; + break; case BBQ_BLOCK_DONE: if (advance_chead(q, ch, state.vsn)) { continue; - } else { - ret.status = BBQ_QUEUE_EMPTY; - return ret; } + ret.status = BBQ_QUEUE_EMPTY; + break; default: ret.status = BBQ_ERROR; - return ret; + break; } + + if (wait_consumed != NULL) { + *wait_consumed = bbq_wait_consumed_set(q, &ch, NULL, blk); + } + + return ret; } } -int bbq_dequeue(struct bbq_s *q, void *deq_data) { - struct bbq_status ret = __bbq_dequeue(q, deq_data, 1, BBQ_F_SINGLE); +int bbq_dequeue(struct bbq *q, void **data) { + struct bbq_status ret = __bbq_dequeue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL); + return ret.status; +} + +int bbq_dequeue_elem(struct bbq *q, void *data) { + struct bbq_status ret = __bbq_dequeue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL); return ret.status; } -uint32_t bbq_max_burst(struct bbq_s *q, uint32_t n) { +uint32_t bbq_max_burst(struct bbq *q, uint32_t n) { uint32_t burst = n; if (burst > q->bs) { burst = q->bs; @@ -723,11 +786,15 @@ uint32_t bbq_max_burst(struct bbq_s *q, uint32_t n) { return burst; } -static uint32_t bbq_dequeue_burst_one_dimensional(struct bbq_s *q, void *obj_table, uint32_t n) { +static uint32_t bbq_dequeue_burst_one_dimensional(struct bbq *q, void *obj_table, uint32_t n, uint32_t *wait_consumed) { if (q == NULL || obj_table == NULL) { return BBQ_NULL_PTR; } + if (!BBQ_F_CHK_VALUE(q->flags)) { + return BBQ_QUEUE_DATA_ERR; + } + uint32_t burst = 0; uint32_t ready = 0; void *obj = obj_table; @@ -735,18 +802,18 @@ static uint32_t bbq_dequeue_burst_one_dimensional(struct bbq_s *q, void *obj_tab while (ready < n) { burst = bbq_max_burst(q, n - ready); - ret = __bbq_dequeue(q, obj, burst, BBQ_F_ARRAY_1D); + ret = __bbq_dequeue(q, obj, burst, BBQ_DATA_TYPE_ARRAY_1D, wait_consumed); if (ret.status != BBQ_OK) { break; } - obj += q->obj_size * ret.actual_burst; + obj += q->entry_size * ret.actual_burst; ready += ret.actual_burst; } return ready; } -static uint32_t bbq_dequeue_burst_two_dimensional(struct bbq_s *q, void **obj_table, uint32_t n) { +static uint32_t bbq_dequeue_burst_two_dimensional(struct bbq *q, void **obj_table, uint32_t n, uint32_t *wait_consumed) { if (q == NULL || obj_table == NULL) { return BBQ_NULL_PTR; } @@ -758,7 +825,7 @@ static uint32_t bbq_dequeue_burst_two_dimensional(struct bbq_s *q, void **obj_ta while (ready < n) { burst = bbq_max_burst(q, n - ready); - ret = __bbq_dequeue(q, obj_table_tmp, burst, BBQ_F_ARRAY_2D); + ret = __bbq_dequeue(q, obj_table_tmp, burst, BBQ_DATA_TYPE_ARRAY_2D, wait_consumed); if (ret.status != BBQ_OK) { break; } @@ -770,11 +837,15 @@ static uint32_t bbq_dequeue_burst_two_dimensional(struct bbq_s *q, void **obj_ta } /* 尝试一次入队多个数据,直到达到最大数量,或是入队失败 */ -uint32_t bbq_enqueue_burst_one_dimensional(struct bbq_s *q, void const *obj_table, uint32_t n) { +uint32_t bbq_enqueue_burst_one_dimensional(struct bbq *q, void const *obj_table, uint32_t n, uint32_t *wait_consumed) { if (q == NULL || obj_table == NULL) { return BBQ_NULL_PTR; } + if (!BBQ_F_CHK_VALUE(q->flags)) { + return BBQ_QUEUE_DATA_ERR; + } + uint32_t burst = 0; uint32_t ready = 0; void const *obj = obj_table; @@ -782,11 +853,11 @@ uint32_t bbq_enqueue_burst_one_dimensional(struct bbq_s *q, void const *obj_tabl while (ready < n) { burst = bbq_max_burst(q, n - ready); - ret = __bbq_enqueue(q, obj, burst, BBQ_F_ARRAY_1D); + ret = __bbq_enqueue(q, obj, burst, BBQ_DATA_TYPE_ARRAY_1D, wait_consumed); if (ret.status != BBQ_OK) { break; } - obj += q->obj_size * ret.actual_burst; + obj += q->entry_size * ret.actual_burst; ready += ret.actual_burst; } @@ -794,7 +865,7 @@ uint32_t bbq_enqueue_burst_one_dimensional(struct bbq_s *q, void const *obj_tabl } /* 尝试一次入队多个数据,直到达到最大数量,或是入队失败 */ -uint32_t bbq_enqueue_burst_two_dimensional(struct bbq_s *q, void *const *obj_table, uint32_t n) { +uint32_t bbq_enqueue_burst_two_dimensional(struct bbq *q, void *const *obj_table, uint32_t n, uint32_t *wait_consumed) { if (q == NULL || obj_table == NULL) { return BBQ_NULL_PTR; } @@ -806,7 +877,7 @@ uint32_t bbq_enqueue_burst_two_dimensional(struct bbq_s *q, void *const *obj_tab while (ready < n) { burst = bbq_max_burst(q, n - ready); - ret = __bbq_enqueue(q, obj_table_tmp, burst, BBQ_F_ARRAY_2D); + ret = __bbq_enqueue(q, obj_table_tmp, burst, BBQ_DATA_TYPE_ARRAY_2D, wait_consumed); if (ret.status != BBQ_OK) { break; } @@ -817,28 +888,60 @@ uint32_t bbq_enqueue_burst_two_dimensional(struct bbq_s *q, void *const *obj_tab return ready; } -uint32_t bbq_enqueue_burst_value(struct bbq_s *q, void const *obj_table, uint32_t n) { - return bbq_enqueue_burst_one_dimensional(q, obj_table, n); +bool bbq_empty(struct bbq *q) { + uint64_t phead = atomic_load(&q->phead); + uint64_t chead = atomic_load(&q->chead); + + uint64_t ph_vsn = bbq_head_vsn(q, phead); + uint64_t ch_vsn = bbq_head_vsn(q, chead); + uint64_t ph_idx = bbq_idx(q, phead); + uint64_t ch_idx = bbq_idx(q, chead); + + struct bbq_block *block; + + if (ph_idx != ch_idx) { + return false; + } + + block = &q->blocks[ph_idx]; + if (ph_vsn == ch_vsn) { + if (bbq_off(q, atomic_load(&block->reserved)) == bbq_off(q, atomic_load(&block->committed))) { + return true; + } + } + + bbq_cursor reserved = atomic_load(&block->reserved); + uint64_t reserved_off = bbq_off(q, reserved); + + if (BBQ_F_CHK_DROP_OLD(q->flags) && + ph_vsn > ch_vsn && + reserved_off != q->bs) { + // 生产者追上了消费者,当前块以及未消费的全部 + // 如果reserved指向当前块的最后一个entry,可以移动head消费下一块,否则返回空 + return true; + } + + return false; } -uint32_t bbq_enqueue_burst_value_two_dimensional(struct bbq_s *q, void *const *obj_table, uint32_t n) { - return bbq_enqueue_burst_two_dimensional(q, obj_table, n); +uint32_t bbq_enqueue_burst_elem(struct bbq *q, void const *obj_table, uint32_t n, uint32_t *wait_consumed) { + return bbq_enqueue_burst_one_dimensional(q, obj_table, n, wait_consumed); } -uint32_t bbq_enqueue_burst_ptr(struct bbq_s *q, void *const *obj_table, uint32_t n) { - return bbq_enqueue_burst_two_dimensional(q, obj_table, n); +uint32_t bbq_enqueue_burst_elem_two_dimensional(struct bbq *q, void *const *obj_table, uint32_t n, uint32_t *wait_consumed) { + return bbq_enqueue_burst_two_dimensional(q, obj_table, n, wait_consumed); } -uint32_t bbq_enqueue_burst_ptr_one_dimensional(struct bbq_s *q, void const *obj_table, uint32_t n) { - return bbq_enqueue_burst_one_dimensional(q, obj_table, n); +uint32_t bbq_enqueue_burst(struct bbq *q, void *const *obj_table, uint32_t n, uint32_t *wait_consumed) { + return bbq_enqueue_burst_two_dimensional(q, obj_table, n, wait_consumed); } -uint32_t bbq_dequeue_burst_ptr(struct bbq_s *q, void **obj_table, uint32_t n) { - return bbq_dequeue_burst_two_dimensional(q, obj_table, n); +uint32_t bbq_dequeue_burst(struct bbq *q, void **obj_table, uint32_t n, uint32_t *wait_consumed) { + return bbq_dequeue_burst_two_dimensional(q, obj_table, n, wait_consumed); } -uint32_t bbq_dequeue_burst_value(struct bbq_s *q, void *obj_table, uint32_t n) { - return bbq_dequeue_burst_one_dimensional(q, obj_table, n); +uint32_t bbq_dequeue_burst_elem(struct bbq *q, void *obj_table, uint32_t n, uint32_t *wait_consumed) { + return bbq_dequeue_burst_one_dimensional(q, obj_table, n, wait_consumed); } bool bbq_malloc_free_equal() { @@ -865,7 +968,7 @@ bool bbq_malloc_free_equal() { #endif } -bool bbq_check_array_bounds(struct bbq_s *q) { +bool bbq_debug_check_array_bounds(struct bbq *q) { #ifdef BBQ_MEMORY void *value = malloc(q->entry_size); memset(value, BBQ_MEM_MAGIC, q->entry_size); @@ -883,7 +986,7 @@ bool bbq_check_array_bounds(struct bbq_s *q) { return true; } -void bbq_memory_print() { +void bbq_debug_memory_print() { #ifdef BBQ_MEMORY for (int i = 0; i < BBQ_MODULE_MAX; i++) { uint64_t malloc_cnt = atomic_load(&bbq_memory_g[i].malloc_cnt); @@ -905,25 +1008,36 @@ void bbq_memory_print() { #endif } -#if 1 -// 调试用 -void bbq_block_print(struct bbq_s *q, struct bbq_block_s *block) { +void bbq_debug_block_print(struct bbq *q, struct bbq_block *block) { bbq_cursor allocated = atomic_load(&block->allocated); bbq_cursor committed = atomic_load(&block->committed); bbq_cursor reserved = atomic_load(&block->reserved); bbq_cursor consumed = atomic_load(&block->consumed); - printf(" allocated:%lu\n", bbq_off(q, allocated)); - printf(" committed:%lu\n", bbq_off(q, committed)); - printf(" reserved:%lu\n", bbq_off(q, reserved)); - printf(" consumed:%lu\n\n", bbq_off(q, consumed)); + printf(" allocated:%lu committed:%lu reserved:%lu", + bbq_off(q, allocated), bbq_off(q, committed), bbq_off(q, reserved)); + if (BBQ_F_CHK_DROP_OLD(q->flags)) { + printf("\n"); + } else { + printf(" consumed:%lu\n", bbq_off(q, consumed)); + } } -void bbq_struct_print(struct bbq_s *q) { - printf("-------------\n"); - printf("ph idx:%lu vsn:%lu\n", bbq_idx(q, q->phead), bbq_head_vsn(q, q->phead)); - bbq_block_print(q, &(q->blocks[bbq_idx(q, q->phead)])); +void bbq_debug_struct_print(struct bbq *q) { + printf("-----bbq:%s-----\n", BBQ_F_CHK_DROP_OLD(q->flags) ? "drop old" : "retry new"); + uint64_t phead = atomic_load(&q->phead); + uint64_t chead = atomic_load(&q->chead); - printf("ch idx:%lu vsn:%lu", bbq_idx(q, q->chead), bbq_head_vsn(q, q->chead)); - bbq_block_print(q, &(q->blocks[bbq_idx(q, q->chead)])); -} -#endif \ No newline at end of file + printf("block number:%lu block size:%lu total entries:%lu\n", q->bn, q->bs, q->bn * q->bs); + printf("producer header idx:%lu vsn:%lu\n", bbq_idx(q, phead), bbq_head_vsn(q, phead)); + + uint64_t ph_idx = bbq_idx(q, phead); + uint64_t ch_idx = bbq_idx(q, chead); + if (ph_idx != ch_idx) { + printf("block[%lu]\n", ph_idx); + bbq_debug_block_print(q, &(q->blocks[ph_idx])); + } + + printf("consumer header idx:%lu vsn:%lu\n", bbq_idx(q, chead), bbq_head_vsn(q, chead)); + printf("block[%lu]\n", ch_idx); + bbq_debug_block_print(q, &(q->blocks[ch_idx])); +} \ No newline at end of file diff --git a/bbq/test.c b/bbq/test.c deleted file mode 100644 index 7fa8ecf..0000000 --- a/bbq/test.c +++ /dev/null @@ -1,54 +0,0 @@ -/* - * @Description: 描述信息 - * @Date: 2024-06-13 21:45:38 - * @LastEditTime: 2024-06-17 18:28:18 - */ -#include -#include -#include -#include - -// 假设我们有一些uint16_t的指针,并且我们想要复制它们的地址 -void copy_pointer_array(void **dest, uint16_t **src, size_t num_pointers) { - // 使用memcpy复制指针数组的内容 - memcpy(dest, src, num_pointers * sizeof(uint16_t *)); -} - -// 一个简单的函数来展示如何使用这些指针 -void print_through_pointers(void *entries, size_t num_pointers) { - uint16_t **pointers = (uint16_t **)entries; - for (size_t i = 0; i < num_pointers; ++i) { - if (pointers[i]) { - printf("Pointer %zu points to %p, value: %u\n", i, (void *)pointers[i], *pointers[i]); - } else { - printf("Pointer %zu is NULL\n", i); - } - } -} - -struct test_s { - void *entries; -}; - -int main() { - // 示例:创建一些uint16_t的指针和值 - uint16_t val1 = 123, val2 = 456, val3 = 789; - uint16_t *ptrs[3] = {&val1, &val2, &val3}; - - printf("%p\n", ptrs[0]); - printf("%p\n", ptrs[1]); - printf("%p\n", ptrs[2]); - - // 使用void指针进行复制 - struct test_s tt; - tt.entries = malloc(3 * sizeof(uint16_t *)); - copy_pointer_array(tt.entries, (uint16_t **)ptrs, 3); - - // 通过void指针读取数据 - print_through_pointers(tt.entries, 3); - - // 清理内存 - free(tt.entries); - - return 0; -} \ No newline at end of file diff --git a/bbq/tests/common/test_queue.c b/bbq/tests/common/test_queue.c index 9a96e33..f4726d9 100644 --- a/bbq/tests/common/test_queue.c +++ b/bbq/tests/common/test_queue.c @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-18 04:11:12 + * @LastEditTime: 2024-06-24 10:16:28 * @Email: liuyu@geedgenetworks.com * @Describe: TODO */ @@ -10,25 +10,22 @@ #include "test_mix.h" #include #include -extern bool bbq_check_array_bounds(struct bbq_s *q); -extern struct bbq_s *bbq_ring_create_bnbs(uint32_t bn, uint32_t bs, size_t obj_size, unsigned int flags); -extern struct bbq_s *bbq_ring_create_bnbs_with_socket(uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, unsigned int flags); +extern bool bbq_debug_check_array_bounds(struct bbq *q); +extern struct bbq *bbq_create_bnbs_elem(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags); +extern struct bbq *bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs, int socket_id, uint32_t flags); -uint32_t test_bbq_enqueue_burst(void *ring, void **obj_table, uint32_t n, uint16_t thread_idx) { +uint32_t test_bbq_enqueue_burst(void *ring, void **obj_table, uint32_t n, uint16_t thread_idx, uint32_t *wait_consumed) { TEST_AVOID_WARNING(thread_idx); - return bbq_enqueue_burst_ptr(ring, obj_table, n); + return bbq_enqueue_burst(ring, obj_table, n, wait_consumed); } int test_queue_init_bbq(test_cfg *cfg, test_queue_s *q) { - size_t obj_size = sizeof(test_data); - if (cfg->ring.block_count == 0) { - q->ring = bbq_create_with_socket(cfg->ring.entries_cnt, obj_size, 0, BBQ_CREATE_F_RETRY_NEW | BBQ_CREATE_F_COPY_PTR); + q->ring = bbq_create("test_bbq", cfg->ring.entries_cnt, BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); } else { - q->ring = bbq_ring_create_bnbs_with_socket(cfg->ring.block_count, - cfg->ring.entries_cnt / cfg->ring.block_count, - obj_size, - 0, BBQ_CREATE_F_RETRY_NEW | BBQ_CREATE_F_COPY_PTR); + q->ring = bbq_create_bnbs("test_bbq", cfg->ring.block_count, + cfg->ring.entries_cnt / cfg->ring.block_count, + BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); } if (q->ring == NULL) { @@ -36,11 +33,11 @@ int test_queue_init_bbq(test_cfg *cfg, test_queue_s *q) { return BBQ_NULL_PTR; } - q->ring_free_f = (test_ring_free_f)bbq_ring_free; + q->ring_free_f = (test_ring_free_f)bbq_destory; q->enqueue_f = (test_ring_enqueue_f)bbq_enqueue; q->dequeue_f = (test_ring_dequeue_f)bbq_dequeue; - q->dequeue_burst_f = (test_dequeue_burst_f)bbq_dequeue_burst_ptr; q->enqueue_burst_f = (test_enqueue_burst_f)test_bbq_enqueue_burst; + q->dequeue_burst_f = (test_dequeue_burst_f)bbq_dequeue_burst; return 0; } @@ -110,7 +107,7 @@ void test_data_destory(test_data **data, size_t cnt) { uint32_t test_exec_enqueue(test_queue_s *q, test_data **data, size_t burst_cnt, test_time_metric *op_use_diff, uint16_t thread_idx) { uint32_t enqueue_cnt = 0; test_time_metric op_use_start = test_clock_time_get(); - enqueue_cnt = q->enqueue_burst_f(q->ring, (void **)data, burst_cnt, thread_idx); + enqueue_cnt = q->enqueue_burst_f(q->ring, (void **)data, burst_cnt, thread_idx, NULL); *op_use_diff = test_clock_time_sub(test_clock_time_get(), op_use_start); return enqueue_cnt; @@ -120,7 +117,7 @@ uint32_t test_exec_dequeue(test_queue_s *q, test_data **data, size_t burst_cnt, uint32_t dequeue_cnt = 0; test_time_metric op_use_start = test_clock_time_get(); - dequeue_cnt = q->dequeue_burst_f(q->ring, (void **)data, burst_cnt); + dequeue_cnt = q->dequeue_burst_f(q->ring, (void **)data, burst_cnt, NULL); *op_use_diff = test_clock_time_sub(test_clock_time_get(), op_use_start); return dequeue_cnt; diff --git a/bbq/tests/common/test_queue.h b/bbq/tests/common/test_queue.h index 723f0b3..8d40e92 100644 --- a/bbq/tests/common/test_queue.h +++ b/bbq/tests/common/test_queue.h @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-17 18:17:24 + * @LastEditTime: 2024-06-21 18:05:44 * @Email: liuyu@geedgenetworks.com * @Describe: TODO */ @@ -15,8 +15,8 @@ typedef void (*test_ring_free_f)(void *ring); typedef int (*test_ring_enqueue_f)(void *ring, void *obj); typedef int (*test_ring_dequeue_f)(void *ring, void *obj); -typedef uint32_t (*test_enqueue_burst_f)(void *ring, void **obj_table, uint32_t n, uint16_t thread_idx); -typedef uint32_t (*test_dequeue_burst_f)(void *ring, void **obj_table, uint32_t n); +typedef uint32_t (*test_enqueue_burst_f)(void *ring, void **obj_table, uint32_t n, uint16_t thread_idx, uint32_t *wait_consumed); +typedef uint32_t (*test_dequeue_burst_f)(void *ring, void **obj_table, uint32_t n, uint32_t *wait_consumed); typedef bool (*test_ring_empty_f)(void *ring); typedef struct { @@ -91,10 +91,10 @@ extern void test_wait_all_threads_ready(test_ctl *ctl); extern void test_queue_destory(test_queue_s *q); extern int test_queue_init_bbq(test_cfg *cfg, test_queue_s *q); extern void test_merge_all_data(test_exit_data **exit_data, uint32_t thread_cnt, test_merge_s *merge); -extern uint64_t bbq_idx(struct bbq_s *q, uint64_t x); -extern uint64_t bbq_off(struct bbq_s *q, uint64_t x); -extern uint64_t bbq_head_vsn(struct bbq_s *q, uint64_t x); -extern uint64_t bbq_cur_vsn(struct bbq_s *q, uint64_t x); +extern uint64_t bbq_idx(struct bbq *q, uint64_t x); +extern uint64_t bbq_off(struct bbq *q, uint64_t x); +extern uint64_t bbq_head_vsn(struct bbq *q, uint64_t x); +extern uint64_t bbq_cur_vsn(struct bbq *q, uint64_t x); extern test_data **test_data_create(size_t cnt); extern void test_data_destory(test_data **data, size_t cnt); #endif \ No newline at end of file diff --git a/bbq/tests/unittest/ut.h b/bbq/tests/unittest/ut.h index b36c2ee..a6ef9ad 100644 --- a/bbq/tests/unittest/ut.h +++ b/bbq/tests/unittest/ut.h @@ -17,7 +17,7 @@ typedef struct { } testdata_s; typedef struct { - struct bbq_s *q; + struct bbq *q; uint32_t usleep; // 该线程每次执行间隔睡眠时间 bool until_end; // 循环读取,直到队列空或满 uint64_t thread_exec_times; // 每个线程生产/消费次数 diff --git a/bbq/tests/unittest/ut_example.cc b/bbq/tests/unittest/ut_example.cc index fe78e03..64ee3f6 100644 --- a/bbq/tests/unittest/ut_example.cc +++ b/bbq/tests/unittest/ut_example.cc @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-18 15:11:23 + * @LastEditTime: 2024-06-24 10:13:21 * @Email: liuyu@geedgenetworks.com * @Describe: 简单的测试用例,测试基本功能 */ @@ -11,11 +11,10 @@ extern "C" { #include "test_queue.h" #include "ut.h" extern bool bbq_malloc_free_equal(); -extern void bbq_memory_print(); -extern bool bbq_check_array_bounds(struct bbq_s *q); -extern void bbq_struct_print(struct bbq_s *q); -extern uint32_t bbq_enqueue_burst_ptr_one_dimensional(struct bbq_s *q, void const *obj_table, uint32_t n); -extern uint32_t bbq_enqueue_burst_value_two_dimensional(struct bbq_s *q, void *const *obj_table, uint32_t n); +extern void bbq_debug_memory_print(); +extern bool bbq_debug_check_array_bounds(struct bbq *q); +extern void bbq_struct_print(struct bbq *q); +extern uint32_t bbq_enqueue_burst_elem_two_dimensional(struct bbq *q, void *const *obj_table, uint32_t n, uint32_t *wait_consumed); } #define BUF_CNT 4096 @@ -54,22 +53,22 @@ TEST_F(bbq_example, single_retry_new_cp_ptr) { uint16_t *deq_data = NULL; // 创建队列 - struct bbq_s *q = bbq_create(BUF_CNT, sizeof(uint16_t), BBQ_CREATE_F_RETRY_NEW | BBQ_CREATE_F_COPY_PTR); - EXPECT_TRUE(q); + struct bbq *q = bbq_create("test_bbq", BUF_CNT, BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); + ASSERT_NE(q, nullptr); // 空队出队失败 - EXPECT_EQ(bbq_dequeue(q, &deq_data), BBQ_QUEUE_EMPTY); + EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_QUEUE_EMPTY); // 全部入队成功 for (uint32_t i = 0; i < 4000; i++) { - if (bbq_enqueue(q, &enq_table1[i]) == 0) { + if (bbq_enqueue(q, (void **)&enq_table1[i]) == 0) { cnt++; } } // 部分入队成功 for (uint32_t i = 0; i < 4000; i++) { - if (bbq_enqueue(q, &enq_table2[i]) == 0) { + if (bbq_enqueue(q, (void **)&enq_table2[i]) == 0) { cnt++; } } @@ -79,7 +78,7 @@ TEST_F(bbq_example, single_retry_new_cp_ptr) { cnt = 0; for (uint32_t i = 0; i < BUF_CNT; i++) { - ret = bbq_dequeue(q, &deq_data); + ret = bbq_dequeue(q, (void **)&deq_data); if (ret == 0) { EXPECT_EQ(*deq_data, TEST_DATA_MAGIC); cnt++; @@ -89,7 +88,7 @@ TEST_F(bbq_example, single_retry_new_cp_ptr) { EXPECT_EQ(cnt, BUF_CNT); // 空队出队失败 - EXPECT_EQ(bbq_dequeue(q, &deq_data), BBQ_QUEUE_EMPTY); + EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_QUEUE_EMPTY); } TEST_F(bbq_example, single_retry_new_cp_value) { @@ -98,22 +97,22 @@ TEST_F(bbq_example, single_retry_new_cp_value) { uint16_t deq_data; // 创建队列 - struct bbq_s *q = bbq_create(BUF_CNT, sizeof(uint16_t), BBQ_CREATE_F_RETRY_NEW | BBQ_CREATE_F_COPY_VALUE); - EXPECT_TRUE(q); + struct bbq *q = bbq_create_elem("test_bbq", BUF_CNT, sizeof(uint16_t), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); + ASSERT_NE(q, nullptr); // 空队出队失败 - EXPECT_EQ(bbq_dequeue(q, &deq_data), BBQ_QUEUE_EMPTY); + EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_QUEUE_EMPTY); // 全部入队成功 for (uint32_t i = 0; i < 4000; i++) { - if (bbq_enqueue(q, enq_table1[i]) == 0) { + if (bbq_enqueue(q, (void **)enq_table1[i]) == 0) { cnt++; } } // 部分入队成功 for (uint32_t i = 0; i < 4000; i++) { - if (bbq_enqueue(q, enq_table2[i]) == 0) { + if (bbq_enqueue_elem(q, enq_table2[i]) == 0) { cnt++; } } @@ -123,7 +122,7 @@ TEST_F(bbq_example, single_retry_new_cp_value) { cnt = 0; for (uint32_t i = 0; i < BUF_CNT; i++) { - ret = bbq_dequeue(q, &deq_data); + ret = bbq_dequeue_elem(q, &deq_data); if (ret == 0) { EXPECT_EQ(deq_data, TEST_DATA_MAGIC); cnt++; @@ -133,7 +132,7 @@ TEST_F(bbq_example, single_retry_new_cp_value) { EXPECT_EQ(cnt, BUF_CNT); // 空队出队失败 - EXPECT_EQ(bbq_dequeue(q, &deq_data), BBQ_QUEUE_EMPTY); + EXPECT_EQ(bbq_dequeue_elem(q, &deq_data), BBQ_QUEUE_EMPTY); } TEST_F(bbq_example, single_drop_old_cp_pointer) { @@ -144,18 +143,18 @@ TEST_F(bbq_example, single_drop_old_cp_pointer) { uint64_t second_cnt = 1000; // 创建队列 - struct bbq_s *q = bbq_create(BUF_CNT, sizeof(uint16_t), BBQ_CREATE_F_DROP_OLD | BBQ_CREATE_F_COPY_PTR); - EXPECT_TRUE(q); + struct bbq *q = bbq_create("test_bbq", BUF_CNT, BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD); + ASSERT_NE(q, nullptr); EXPECT_LT(second_cnt, q->bs * q->bn); // 空队出队失败 - EXPECT_EQ(bbq_dequeue(q, &deq_data), BBQ_QUEUE_EMPTY); + EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_QUEUE_EMPTY); // 全部入队成功,入队个数是BUF_CNT的整数倍,因此到了一个边界,刚好与消费者位置一致(套了loop圈) uint32_t loop = 3; for (uint32_t n = 0; n < loop; n++) { for (uint32_t i = 0; i < first_cnt; i++) { - ret = bbq_enqueue(q, &enq_table1[i]); + ret = bbq_enqueue(q, (void **)&enq_table1[i]); if (ret == 0) { cnt++; } @@ -166,7 +165,7 @@ TEST_F(bbq_example, single_drop_old_cp_pointer) { // 全部入队成功 cnt = 0; for (uint32_t i = 0; i < second_cnt; i++) { - if (bbq_enqueue(q, &enq_table2[i]) == 0) { + if (bbq_enqueue(q, (void **)&enq_table2[i]) == 0) { cnt++; } } @@ -174,7 +173,7 @@ TEST_F(bbq_example, single_drop_old_cp_pointer) { cnt = 0; for (uint32_t i = 0; i < BUF_CNT; i++) { - ret = bbq_dequeue(q, &deq_data); + ret = bbq_dequeue(q, (void **)&deq_data); if (ret == 0) { EXPECT_EQ(*deq_data, TEST_DATA_MAGIC); cnt++; @@ -185,7 +184,7 @@ TEST_F(bbq_example, single_drop_old_cp_pointer) { EXPECT_EQ(cnt, second_cnt - q->bs); // 空队出队失败 - EXPECT_EQ(bbq_dequeue(q, &deq_data), BBQ_QUEUE_EMPTY); + EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_QUEUE_EMPTY); } TEST_F(bbq_example, single_drop_old_cp_value) { @@ -196,18 +195,18 @@ TEST_F(bbq_example, single_drop_old_cp_value) { uint64_t second_cnt = 1000; // 创建队列 - struct bbq_s *q = bbq_create(BUF_CNT, sizeof(uint16_t), BBQ_CREATE_F_DROP_OLD | BBQ_CREATE_F_COPY_VALUE); - EXPECT_TRUE(q); + struct bbq *q = bbq_create_elem("test_bbq", BUF_CNT, sizeof(uint16_t), BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD); + ASSERT_NE(q, nullptr); EXPECT_LT(second_cnt, q->bs * q->bn); // 空队出队失败 - EXPECT_EQ(bbq_dequeue(q, &deq_data), BBQ_QUEUE_EMPTY); + EXPECT_EQ(bbq_dequeue_elem(q, &deq_data), BBQ_QUEUE_EMPTY); // 全部入队成功,入队个数是BUF_CNT的整数倍,因此到了一个边界,刚好与消费者位置一致(套了loop圈) uint32_t loop = 3; for (uint32_t n = 0; n < loop; n++) { for (uint32_t i = 0; i < first_cnt; i++) { - ret = bbq_enqueue(q, enq_table1[i]); + ret = bbq_enqueue_elem(q, enq_table1[i]); if (ret == 0) { cnt++; } @@ -218,7 +217,7 @@ TEST_F(bbq_example, single_drop_old_cp_value) { // 全部入队成功 cnt = 0; for (uint32_t i = 0; i < second_cnt; i++) { - if (bbq_enqueue(q, enq_table2[i]) == 0) { + if (bbq_enqueue_elem(q, enq_table2[i]) == 0) { cnt++; } } @@ -226,7 +225,7 @@ TEST_F(bbq_example, single_drop_old_cp_value) { cnt = 0; for (uint32_t i = 0; i < BUF_CNT; i++) { - ret = bbq_dequeue(q, &deq_data); + ret = bbq_dequeue_elem(q, &deq_data); if (ret == 0) { EXPECT_EQ(deq_data, TEST_DATA_MAGIC); cnt++; @@ -237,41 +236,44 @@ TEST_F(bbq_example, single_drop_old_cp_value) { EXPECT_EQ(cnt, second_cnt - q->bs); // 空队出队失败 - EXPECT_EQ(bbq_dequeue(q, &deq_data), BBQ_QUEUE_EMPTY); + EXPECT_EQ(bbq_dequeue_elem(q, &deq_data), BBQ_QUEUE_EMPTY); } TEST_F(bbq_example, burst_retry_new_cp_value) { - struct bbq_s *q; + struct bbq *q; uint32_t ret1 = 0; uint32_t ret2 = 0; uint64_t first_cnt = 4000; uint64_t second_cnt = 1000; - uint16_t deq_table1[BUF_CNT] = {0}; uint16_t *deq_table2 = (uint16_t *)test_malloc(TEST_MODULE_DATA, sizeof(uint16_t) * BUF_CNT); + uint32_t wait_consumed = 0; // 创建队列 - q = bbq_create(BUF_CNT, sizeof(uint16_t), BBQ_CREATE_F_RETRY_NEW | BBQ_CREATE_F_COPY_VALUE); - EXPECT_TRUE(q); + q = bbq_create_elem("test_bbq", BUF_CNT, sizeof(uint16_t), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); + ASSERT_NE(q, nullptr); EXPECT_LT(first_cnt, q->bn * q->bs); // 批量入队(全部成功) - // 推荐的函数,传入的数组地址连续,内部可以memcpy批量拷贝,速度快 - ret1 = bbq_enqueue_burst_value(q, (void const *)enq_table3, first_cnt); + ret1 = bbq_enqueue_burst_elem(q, (void const *)enq_table3, first_cnt, &wait_consumed); EXPECT_EQ(ret1, first_cnt); + EXPECT_EQ(wait_consumed, ret1); // 批量入队(部分成功) - // 不推荐,但可用于特殊场景。由于需要将最终的值入队列,二维数组里的值不连续,需要循环赋值。 - ret2 = bbq_enqueue_burst_value_two_dimensional(q, (void *const *)enq_table2, second_cnt); + // 由于需要将最终的值入队列,二维数组里的值不连续,需要循环赋值。不推荐这个函数,但可用于特殊场景。 + ret2 = bbq_enqueue_burst_elem_two_dimensional(q, (void *const *)enq_table2, second_cnt, &wait_consumed); EXPECT_EQ(ret2, BUF_CNT - ret1); + EXPECT_EQ(wait_consumed, ret1 + ret2); // 出队列(全部成功) - ret1 = bbq_dequeue_burst_value(q, (void *)deq_table1, first_cnt); + ret1 = bbq_dequeue_burst_elem(q, (void *)deq_table1, first_cnt, &wait_consumed); EXPECT_EQ(ret1, first_cnt); + EXPECT_EQ(wait_consumed, ret2); // 出队列(部分成功) - ret2 = bbq_dequeue_burst_value(q, (void *)deq_table2, second_cnt); + ret2 = bbq_dequeue_burst_elem(q, (void *)deq_table2, second_cnt, &wait_consumed); EXPECT_EQ(ret2, BUF_CNT - ret1); + EXPECT_EQ(wait_consumed, 0); // 验证数据 for (uint32_t i = 0; i < ret1; i++) { @@ -282,43 +284,45 @@ TEST_F(bbq_example, burst_retry_new_cp_value) { EXPECT_EQ(deq_table2[i], TEST_DATA_MAGIC) << "i :" << i; } - EXPECT_TRUE(bbq_check_array_bounds(q)); - bbq_ring_free(q); + EXPECT_TRUE(bbq_debug_check_array_bounds(q)); + bbq_destory(q); test_free(TEST_MODULE_DATA, deq_table2); } TEST_F(bbq_example, burst_retry_new_cp_pointer) { - struct bbq_s *q; + struct bbq *q; uint32_t ret1 = 0; uint32_t ret2 = 0; uint64_t first_cnt = 4000; uint64_t second_cnt = 1000; - + uint32_t wait_consumed = 0; uint16_t *deq_table1[BUF_CNT] = {0}; uint16_t **deq_table2 = (uint16_t **)test_malloc(TEST_MODULE_DATA, sizeof(uint16_t *) * BUF_CNT); // 创建队列 - q = bbq_create(BUF_CNT, sizeof(uint16_t), BBQ_CREATE_F_RETRY_NEW | BBQ_CREATE_F_COPY_PTR); - EXPECT_TRUE(q); + q = bbq_create("test_bbq", BUF_CNT, BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); + ASSERT_NE(q, nullptr); EXPECT_LT(first_cnt, q->bn * q->bs); // 批量入队(全部成功) - // 推荐的函数,传入的数组地址连续,内部可以memcpy批量拷贝,速度快 - ret1 = bbq_enqueue_burst_ptr(q, (void *const *)enq_table1, first_cnt); + ret1 = bbq_enqueue_burst(q, (void *const *)enq_table1, first_cnt, &wait_consumed); EXPECT_EQ(ret1, first_cnt); + EXPECT_EQ(wait_consumed, ret1); // 批量入队(部分成功) - // 不推荐,但可用于特殊场景。将数组成员的每个地址入队,需要循环取成员地址。 - ret2 = bbq_enqueue_burst_ptr_one_dimensional(q, (const void *)enq_table3, second_cnt); + ret2 = bbq_enqueue_burst(q, (void *const *)enq_table2, second_cnt, &wait_consumed); EXPECT_EQ(ret2, BUF_CNT - ret1); + EXPECT_EQ(wait_consumed, ret1 + ret2); // 出队列(全部成功) - ret1 = bbq_dequeue_burst_ptr(q, (void **)deq_table1, first_cnt); + ret1 = bbq_dequeue_burst(q, (void **)deq_table1, first_cnt, &wait_consumed); EXPECT_EQ(ret1, first_cnt); + EXPECT_EQ(wait_consumed, ret2); // 出队列(部分成功) - ret2 = bbq_dequeue_burst_ptr(q, (void **)deq_table2, second_cnt); + ret2 = bbq_dequeue_burst(q, (void **)deq_table2, second_cnt, &wait_consumed); EXPECT_EQ(ret2, BUF_CNT - ret1); + EXPECT_EQ(wait_consumed, 0); // 验证数据 for (uint32_t i = 0; i < ret1; i++) { @@ -329,85 +333,98 @@ TEST_F(bbq_example, burst_retry_new_cp_pointer) { EXPECT_EQ(*deq_table2[i], TEST_DATA_MAGIC) << "i :" << i; } - EXPECT_TRUE(bbq_check_array_bounds(q)); - bbq_ring_free(q); + EXPECT_TRUE(bbq_debug_check_array_bounds(q)); + bbq_destory(q); test_free(TEST_MODULE_DATA, deq_table2); } TEST_F(bbq_example, burst_drop_old_cp_pointer) { - struct bbq_s *q; + struct bbq *q; uint32_t ret1 = 0; uint32_t ret2 = 0; uint64_t first_cnt = BUF_CNT; uint64_t second_cnt = 1000; - + uint32_t wait_consumed = 0; uint16_t *deq_table1[BUF_CNT] = {0}; uint16_t **deq_table2 = (uint16_t **)test_malloc(TEST_MODULE_DATA, sizeof(uint16_t *) * BUF_CNT); // 创建队列 - q = bbq_create(BUF_CNT, sizeof(uint16_t), BBQ_CREATE_F_DROP_OLD | BBQ_CREATE_F_COPY_PTR); - EXPECT_TRUE(q); + q = bbq_create("test_bbq", BUF_CNT, BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD); + ASSERT_NE(q, nullptr); + EXPECT_GT(second_cnt, q->bs); EXPECT_LT(second_cnt, q->bs * q->bn); - // 批量入队(全部成功) - // 推荐的函数,传入的数组地址连续,内部可以memcpy批量拷贝,速度快 - ret1 = bbq_enqueue_burst_ptr(q, (void *const *)enq_table1, first_cnt); + // 批量入队(全部成功,入队个数等于队列总容量,未发生覆盖) + ret1 = bbq_enqueue_burst(q, (void *const *)enq_table1, first_cnt, &wait_consumed); EXPECT_EQ(ret1, first_cnt); + EXPECT_EQ(wait_consumed, ret1); // 批量入队(全部成功),覆盖了旧数据 - // 不推荐,但可用于特殊场景。将数组成员的每个地址入队,需要循环取成员地址。 - ret2 = bbq_enqueue_burst_ptr_one_dimensional(q, (const void *)enq_table3, second_cnt); + ret2 = bbq_enqueue_burst(q, (void *const *)enq_table2, second_cnt, &wait_consumed); EXPECT_EQ(ret2, second_cnt); + EXPECT_EQ(wait_consumed, second_cnt - q->bs); // 出队列(部分成功) - // 一旦生产者追上了消费者,之前未消费的,以及当前块的数据全都作废了。 - ret1 = bbq_dequeue_burst_ptr(q, (void **)deq_table1, BUF_CNT); + // 一旦生产者追上了消费者,之前未消费的,以及当前块的数据全都作废了。本例中第一个完整块作废。 + ret1 = bbq_dequeue_burst(q, (void **)deq_table1, BUF_CNT, &wait_consumed); EXPECT_EQ(ret1, second_cnt - q->bs); + EXPECT_EQ(wait_consumed, 0); // 验证数据 for (uint32_t i = 0; i < ret1; i++) { EXPECT_EQ(*deq_table1[i], TEST_DATA_MAGIC) << "i :" << i; } - EXPECT_TRUE(bbq_check_array_bounds(q)); - bbq_ring_free(q); + // 此时生产者和消费者在同一块上,入队个数为队列容量的N倍,由于发生了覆盖,且依旧在同一块上,数据全作废 + for (uint32_t loop = 0; loop < 3; loop++) { + ret1 = bbq_enqueue_burst(q, (void *const *)enq_table1, BUF_CNT, &wait_consumed); + EXPECT_EQ(ret1, BUF_CNT); + EXPECT_TRUE(bbq_empty(q)); + EXPECT_EQ(wait_consumed, 0); + } + + EXPECT_TRUE(bbq_debug_check_array_bounds(q)); + bbq_destory(q); test_free(TEST_MODULE_DATA, deq_table2); } TEST_F(bbq_example, burst_drop_old_cp_value) { - struct bbq_s *q; + struct bbq *q; uint32_t ret1 = 0; uint32_t ret2 = 0; uint64_t first_cnt = BUF_CNT; uint64_t second_cnt = 1000; - + uint32_t wait_consumed = 0; uint16_t deq_table1[BUF_CNT] = {0}; // 创建队列 - q = bbq_create(BUF_CNT, sizeof(uint16_t), BBQ_CREATE_F_DROP_OLD | BBQ_CREATE_F_COPY_VALUE); - EXPECT_TRUE(q); + q = bbq_create_elem("test_bbq", BUF_CNT, sizeof(uint16_t), BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD); + ASSERT_NE(q, nullptr); + EXPECT_GT(second_cnt, q->bs); EXPECT_LT(second_cnt, q->bs * q->bn); // 批量入队(全部成功) - // 推荐的函数,传入的数组地址连续,内部可以memcpy批量拷贝,速度快 - ret1 = bbq_enqueue_burst_value(q, (void const *)enq_table3, first_cnt); + ret1 = bbq_enqueue_burst_elem(q, (void const *)enq_table3, first_cnt, &wait_consumed); EXPECT_EQ(ret1, first_cnt); + EXPECT_EQ(wait_consumed, ret1); // 批量入队(全部成功),覆盖了旧数据 - // 不推荐,但可用于特殊场景。将数组成员的每个地址入队,需要循环取成员地址。 - ret2 = bbq_enqueue_burst_value_two_dimensional(q, (void *const *)enq_table1, second_cnt); + // 由于需要将最终的值入队列,二维数组里的值不连续,需要循环赋值。不推荐这个函数,但可用于特殊场景。 + ret2 = bbq_enqueue_burst_elem_two_dimensional(q, (void *const *)enq_table1, second_cnt, &wait_consumed); EXPECT_EQ(ret2, second_cnt); + EXPECT_EQ(wait_consumed, second_cnt - q->bs); // 出队列(部分成功) // 一旦生产者追上了消费者,之前未消费的,以及当前块的数据全都作废了。 - ret1 = bbq_dequeue_burst_value(q, (void *)deq_table1, BUF_CNT); + ret1 = bbq_dequeue_burst_elem(q, (void *)deq_table1, BUF_CNT, &wait_consumed); EXPECT_EQ(ret1, second_cnt - q->bs); + EXPECT_EQ(wait_consumed, 0); // 验证数据 for (uint32_t i = 0; i < ret1; i++) { EXPECT_EQ(deq_table1[i], TEST_DATA_MAGIC) << "i :" << i; } - EXPECT_TRUE(bbq_check_array_bounds(q)); - bbq_ring_free(q); + EXPECT_TRUE(bbq_debug_check_array_bounds(q)); + bbq_destory(q); } \ No newline at end of file diff --git a/bbq/tests/unittest/ut_head_cursor.cc b/bbq/tests/unittest/ut_head_cursor.cc index dd95e7e..b8bff37 100644 --- a/bbq/tests/unittest/ut_head_cursor.cc +++ b/bbq/tests/unittest/ut_head_cursor.cc @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-17 11:15:19 + * @LastEditTime: 2024-06-24 10:15:57 * @Email: liuyu@geedgenetworks.com * @Describe: TODO */ @@ -9,10 +9,9 @@ extern "C" { #include "test_queue.h" #include "ut.h" extern bool bbq_malloc_free_equal(); -extern void bbq_memory_print(); -extern bool bbq_check_array_bounds(struct bbq_s *q); -extern struct bbq_s *bbq_ring_create_bnbs(uint32_t bn, uint32_t bs, size_t obj_size, unsigned int flags); -extern struct bbq_s *bbq_ring_create_bnbs_with_socket(uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, unsigned int flags); +extern void bbq_debug_memory_print(); +extern bool bbq_debug_check_array_bounds(struct bbq *q); +extern struct bbq *bbq_create_bnbs_elem(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags); } class bbq_head_cursor : public testing::Test { // 继承了 testing::Test @@ -31,45 +30,43 @@ class bbq_head_cursor : public testing::Test { // 继承了 testing::Test } }; -void expect_phead(struct bbq_s *q, uint64_t idx, uint64_t vsn, int line) { +void expect_phead(struct bbq *q, uint64_t idx, uint64_t vsn, int line) { EXPECT_EQ(bbq_idx(q, q->phead), idx) << "line: " << line; EXPECT_EQ(bbq_head_vsn(q, q->phead), vsn) << "line: " << line; } -void expect_chead(struct bbq_s *q, uint64_t idx, uint64_t vsn, int line) { +void expect_chead(struct bbq *q, uint64_t idx, uint64_t vsn, int line) { EXPECT_EQ(bbq_idx(q, q->chead), idx) << "line: " << line; EXPECT_EQ(bbq_head_vsn(q, q->chead), vsn) << "line: " << line; } -void expect_eq_allocated(struct bbq_s *q, bbq_block_s *block, uint64_t off, uint64_t vsn, int line) { +void expect_eq_allocated(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) { EXPECT_EQ(bbq_off(q, block->allocated), off) << "line: " << line; EXPECT_EQ(bbq_cur_vsn(q, block->allocated), vsn) << "line: " << line; } -void expect_eq_committed(struct bbq_s *q, bbq_block_s *block, uint64_t off, uint64_t vsn, int line) { +void expect_eq_committed(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) { EXPECT_EQ(bbq_off(q, block->committed), off) << "line: " << line; EXPECT_EQ(bbq_cur_vsn(q, block->committed), vsn) << "line: " << line; } -void expect_eq_consumed(struct bbq_s *q, bbq_block_s *block, uint64_t off, uint64_t vsn, int line) { +void expect_eq_consumed(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) { EXPECT_EQ(bbq_off(q, block->consumed), off) << "line: " << line; EXPECT_EQ(bbq_cur_vsn(q, block->consumed), vsn) << "line: " << line; } -void expect_eq_reserved(struct bbq_s *q, bbq_block_s *block, uint64_t off, uint64_t vsn, int line) { +void expect_eq_reserved(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) { EXPECT_EQ(bbq_off(q, block->reserved), off) << "line: " << line; EXPECT_EQ(bbq_cur_vsn(q, block->reserved), vsn) << "line: " << line; } // 初始化状态 TEST_F(bbq_head_cursor, init) { - test_memory_counter_clear(); - - struct bbq_s *q; + struct bbq *q; uint32_t bn = 2; uint32_t bs = 4; - q = bbq_ring_create_bnbs(bn, bs, sizeof(int), BBQ_CREATE_F_COPY_VALUE | BBQ_CREATE_F_RETRY_NEW); - EXPECT_TRUE(q); + q = bbq_create_bnbs_elem("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); + ASSERT_NE(q, nullptr); // 1.初始化状态,除了第一个block外其他块的4个游标都指向最后一个条目 EXPECT_EQ(q->phead, 0); @@ -85,15 +82,15 @@ TEST_F(bbq_head_cursor, init) { expect_eq_reserved(q, &q->blocks[i], bs, 0, __LINE__); expect_eq_consumed(q, &q->blocks[i], bs, 0, __LINE__); } - EXPECT_TRUE(bbq_check_array_bounds(q)); - bbq_ring_free(q); + EXPECT_TRUE(bbq_debug_check_array_bounds(q)); + bbq_destory(q); EXPECT_TRUE(bbq_malloc_free_equal()); EXPECT_TRUE(test_malloc_free_equal()); } void ut_produce_something(uint32_t produce_cnt) { int ret = 0; - struct bbq_s *q; + struct bbq *q; uint32_t bn = 8; uint32_t bs = 4096; int enqueue_data = TEST_DATA_MAGIC; @@ -102,12 +99,12 @@ void ut_produce_something(uint32_t produce_cnt) { EXPECT_GT(produce_cnt, 0); EXPECT_LE(produce_cnt, bs); - q = bbq_ring_create_bnbs(bn, bs, sizeof(int), BBQ_CREATE_F_COPY_VALUE | BBQ_CREATE_F_RETRY_NEW); - EXPECT_TRUE(q); + q = bbq_create_bnbs_elem("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); + ASSERT_NE(q, nullptr); // 生产produce_cnt for (uint32_t i = 0; i < produce_cnt; i++) { - ret = bbq_enqueue(q, &enqueue_data); + ret = bbq_enqueue_elem(q, &enqueue_data); EXPECT_TRUE(ret == BBQ_OK); } @@ -120,7 +117,7 @@ void ut_produce_something(uint32_t produce_cnt) { // 消费完 for (uint32_t i = 0; i < produce_cnt; i++) { - ret = bbq_dequeue(q, &dequeue_data); + ret = bbq_dequeue_elem(q, &dequeue_data); EXPECT_TRUE(ret == BBQ_OK); EXPECT_EQ(dequeue_data, TEST_DATA_MAGIC); } @@ -138,13 +135,11 @@ void ut_produce_something(uint32_t produce_cnt) { expect_eq_reserved(q, &q->blocks[i], bs, 0, __LINE__); expect_eq_consumed(q, &q->blocks[i], bs, 0, __LINE__); } - EXPECT_TRUE(bbq_check_array_bounds(q)); - bbq_ring_free(q); + EXPECT_TRUE(bbq_debug_check_array_bounds(q)); + bbq_destory(q); } // 在第一块内生产,然后被消费完 TEST_F(bbq_head_cursor, produce_something) { - test_memory_counter_clear(); - ut_produce_something(1); ut_produce_something(567); ut_produce_something(789); @@ -155,7 +150,7 @@ TEST_F(bbq_head_cursor, produce_something) { void ut_produce_next_block(uint32_t over) { int ret = 0; - struct bbq_s *q; + struct bbq *q; uint32_t bn = 8; uint32_t bs = 4096; uint32_t produce_cnt = bs + over; @@ -165,12 +160,12 @@ void ut_produce_next_block(uint32_t over) { EXPECT_GT(over, 0); EXPECT_LT(over, bs); - q = bbq_ring_create_bnbs(bn, bs, sizeof(int), BBQ_CREATE_F_COPY_VALUE | BBQ_CREATE_F_RETRY_NEW); - EXPECT_TRUE(q); + q = bbq_create_bnbs_elem("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); + ASSERT_NE(q, nullptr); // 生产至第二块的第一个entry for (uint32_t i = 0; i < produce_cnt; i++) { - ret = bbq_enqueue(q, &enqueue_data); + ret = bbq_enqueue_elem(q, &enqueue_data); EXPECT_TRUE(ret == BBQ_OK); } @@ -188,7 +183,7 @@ void ut_produce_next_block(uint32_t over) { // 消费完 for (uint32_t i = 0; i < produce_cnt; i++) { - ret = bbq_dequeue(q, &dequeue_data); + ret = bbq_dequeue_elem(q, &dequeue_data); EXPECT_TRUE(ret == BBQ_OK); EXPECT_EQ(dequeue_data, TEST_DATA_MAGIC); } @@ -205,14 +200,12 @@ void ut_produce_next_block(uint32_t over) { expect_eq_reserved(q, &q->blocks[1], over, 1, __LINE__); expect_eq_consumed(q, &q->blocks[1], over, 1, __LINE__); - EXPECT_TRUE(bbq_check_array_bounds(q)); - bbq_ring_free(q); + EXPECT_TRUE(bbq_debug_check_array_bounds(q)); + bbq_destory(q); } // 第一块生产完毕,第二块生产了若干,然后被消费完 TEST_F(bbq_head_cursor, produce_next_block) { - test_memory_counter_clear(); - ut_produce_next_block(1); ut_produce_next_block(123); ut_produce_next_block(456); @@ -223,26 +216,26 @@ TEST_F(bbq_head_cursor, produce_next_block) { void ut_produce_all_loop(uint32_t loop) { int ret = 0; - struct bbq_s *q; + struct bbq *q; uint32_t bn = 8; uint32_t bs = 4096; uint32_t produce_cnt = bn * bs; int enqueue_data = TEST_DATA_MAGIC; int dequeue_data = 0; - q = bbq_ring_create_bnbs(bn, bs, sizeof(int), BBQ_CREATE_F_COPY_VALUE | BBQ_CREATE_F_RETRY_NEW); - EXPECT_TRUE(q); + q = bbq_create_bnbs_elem("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); + ASSERT_NE(q, nullptr); for (uint32_t cnt = 0; cnt < loop; cnt++) { // 所有entry生产完毕 for (uint32_t i = 0; i < produce_cnt; i++) { - ret = bbq_enqueue(q, &enqueue_data); + ret = bbq_enqueue_elem(q, &enqueue_data); EXPECT_TRUE(ret == BBQ_OK); } // 消费完 for (uint32_t i = 0; i < produce_cnt; i++) { - ret = bbq_dequeue(q, &dequeue_data); + ret = bbq_dequeue_elem(q, &dequeue_data); EXPECT_TRUE(ret == BBQ_OK); EXPECT_EQ(dequeue_data, TEST_DATA_MAGIC); } @@ -263,14 +256,12 @@ void ut_produce_all_loop(uint32_t loop) { expect_eq_consumed(q, &q->blocks[i], bs, loop, __LINE__); } - EXPECT_TRUE(bbq_check_array_bounds(q)); - bbq_ring_free(q); + EXPECT_TRUE(bbq_debug_check_array_bounds(q)); + bbq_destory(q); } // 完成多轮的满生产和满消费 TEST_F(bbq_head_cursor, produce_all_loop) { - test_memory_counter_clear(); - ut_produce_all_loop(1); ut_produce_all_loop(10); ut_produce_all_loop(23); @@ -280,30 +271,31 @@ TEST_F(bbq_head_cursor, produce_all_loop) { } TEST_F(bbq_head_cursor, retry_new_full_empty) { - test_memory_counter_clear(); int ret = 0; uint32_t entries_cnt = 4096; uint32_t loop = 1000; - struct bbq_s *q; + struct bbq *q; int *data = (int *)test_malloc(TEST_MODULE_UTEST, sizeof(*data) * entries_cnt); int tmp_data = 0; EXPECT_TRUE(data); - q = bbq_create(entries_cnt, sizeof(int), BBQ_CREATE_F_COPY_VALUE | BBQ_CREATE_F_RETRY_NEW); - EXPECT_TRUE(q); + q = bbq_create_elem("test_bbq", entries_cnt, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); + ASSERT_NE(q, nullptr); + EXPECT_TRUE(bbq_empty(q)); for (uint32_t i = 0; i < loop; i++) { // 入满队 for (uint32_t j = 0; j < entries_cnt; j++) { data[j] = (i + 1) * j; - ret = bbq_enqueue(q, &data[j]); + ret = bbq_enqueue_elem(q, &data[j]); EXPECT_TRUE(ret == BBQ_OK) << "ret " << ret; + EXPECT_FALSE(bbq_empty(q)); } // 满队再入队 for (uint32_t j = 0; j < entries_cnt / 3; j++) { - ret = bbq_enqueue(q, &data[j]); + ret = bbq_enqueue_elem(q, &data[j]); EXPECT_TRUE(ret == BBQ_QUEUE_FULL); } @@ -319,14 +311,16 @@ TEST_F(bbq_head_cursor, retry_new_full_empty) { // 全出队 for (uint32_t j = 0; j < entries_cnt; j++) { - ret = bbq_dequeue(q, &tmp_data); + EXPECT_FALSE(bbq_empty(q)); + ret = bbq_dequeue_elem(q, &tmp_data); EXPECT_TRUE(ret == BBQ_OK); EXPECT_EQ(tmp_data, data[j]); } + EXPECT_TRUE(bbq_empty(q)); // 空出队再出队 for (uint32_t j = 0; j < entries_cnt / 2; j++) { - ret = bbq_dequeue(q, &tmp_data); + ret = bbq_dequeue_elem(q, &tmp_data); EXPECT_TRUE(ret == BBQ_QUEUE_EMPTY); } @@ -340,15 +334,13 @@ TEST_F(bbq_head_cursor, retry_new_full_empty) { } test_free(TEST_MODULE_UTEST, data); - EXPECT_TRUE(bbq_check_array_bounds(q)); - bbq_ring_free(q); + EXPECT_TRUE(bbq_debug_check_array_bounds(q)); + bbq_destory(q); EXPECT_TRUE(bbq_malloc_free_equal()); EXPECT_TRUE(test_malloc_free_equal()); } TEST_F(bbq_head_cursor, mpsc_faa) { - test_memory_counter_clear(); - test_info_s test_info = { .cfg = { .base = { @@ -408,41 +400,44 @@ TEST_F(bbq_head_cursor, mpsc_faa) { } test_free(TEST_MODULE_UTEST, exit_data); test_threads_destory(&test_info, threads); - EXPECT_TRUE(bbq_check_array_bounds((struct bbq_s *)q.ring)); + EXPECT_TRUE(bbq_debug_check_array_bounds((struct bbq *)q.ring)); test_queue_destory(&q); EXPECT_TRUE(bbq_malloc_free_equal()); EXPECT_TRUE(test_malloc_free_equal()); } -TEST_F(bbq_head_cursor, drop_old_full_empty1) { - test_memory_counter_clear(); +TEST_F(bbq_head_cursor, drop_old_full_empty) { int ret = 0; uint32_t bn = 2; uint32_t bs = 4; uint32_t loop = 1000; - struct bbq_s *q; + struct bbq *q; int tmp_data = 0; - q = bbq_ring_create_bnbs(bn, bs, sizeof(int), BBQ_CREATE_F_COPY_VALUE | BBQ_CREATE_F_DROP_OLD); - EXPECT_TRUE(q); + q = bbq_create_bnbs_elem("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD); + ASSERT_NE(q, nullptr); + EXPECT_TRUE(bbq_empty(q)); for (uint32_t j = 0; j < loop; j++) { // 入满队列 for (uint32_t i = 0; i < bn * bs; i++) { - ret = bbq_enqueue(q, &i); + ret = bbq_enqueue_elem(q, &i); EXPECT_TRUE(ret == BBQ_OK) << "ret " << ret; + EXPECT_FALSE(bbq_empty(q)); } // 全出队 for (uint32_t i = 0; i < bn * bs; i++) { - ret = bbq_dequeue(q, &tmp_data); + EXPECT_FALSE(bbq_empty(q)); + ret = bbq_dequeue_elem(q, &tmp_data); EXPECT_TRUE(ret == BBQ_OK) << "ret " << ret; EXPECT_EQ(tmp_data, i); } + EXPECT_TRUE(bbq_empty(q)); // 空队再出队,失败 for (uint32_t i = 0; i < bn * bs; i++) { - ret = bbq_dequeue(q, &tmp_data); + ret = bbq_dequeue_elem(q, &tmp_data); EXPECT_TRUE(ret == BBQ_QUEUE_EMPTY) << "ret " << ret; } @@ -455,32 +450,41 @@ TEST_F(bbq_head_cursor, drop_old_full_empty1) { EXPECT_EQ(q->blocks[i].consumed.load(), 0); } } - EXPECT_TRUE(bbq_check_array_bounds(q)); - bbq_ring_free(q); + EXPECT_TRUE(bbq_debug_check_array_bounds(q)); + bbq_destory(q); EXPECT_TRUE(bbq_malloc_free_equal()); EXPECT_TRUE(test_malloc_free_equal()); } -TEST_F(bbq_head_cursor, drop_old_full_empty2) { - test_memory_counter_clear(); +TEST_F(bbq_head_cursor, drop_old_full_empty_cover) { int ret = 0; uint32_t bn = 2; uint32_t bs = 4; uint32_t loop = 1000; uint32_t over_cnt = bs + 2; - struct bbq_s *q; + struct bbq *q; EXPECT_EQ(over_cnt / bs, 1); int tmp_data = 0; - q = bbq_ring_create_bnbs(bn, bs, sizeof(int), BBQ_CREATE_F_COPY_VALUE | BBQ_CREATE_F_DROP_OLD); - EXPECT_TRUE(q); + q = bbq_create_bnbs_elem("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD); + ASSERT_NE(q, nullptr); + EXPECT_TRUE(bbq_empty(q)); // 入满队列,再入over_cnt for (uint32_t i = 0; i < bn * bs * loop + over_cnt; i++) { - ret = bbq_enqueue(q, &i); + ret = bbq_enqueue_elem(q, &i); EXPECT_TRUE(ret == BBQ_OK) << "ret " << ret; + + uint32_t tmpA = i / (bn * bs); + uint32_t tmpB = i % (bn * bs); + if (tmpA > 0 && (tmpB < bs)) { + // 覆盖第一个块时,整个块被作废,因此都是empty,从第二个块开始可读取 + EXPECT_TRUE(bbq_empty(q)) << "i " << i << "tmpA " << tmpA << "tmpB " << tmpB; + } else { + EXPECT_FALSE(bbq_empty(q)); + } } expect_phead(q, 1, loop, __LINE__); @@ -503,12 +507,13 @@ TEST_F(bbq_head_cursor, drop_old_full_empty2) { // 队列中的数据全出队 for (uint32_t i = 0; i < over_cnt - bs; i++) { - ret = bbq_dequeue(q, &tmp_data); + ret = bbq_dequeue_elem(q, &tmp_data); EXPECT_TRUE(ret == BBQ_OK) << "ret " << ret; } for (uint32_t i = 0; i < bn * bs; i++) { - ret = bbq_dequeue(q, &tmp_data); + EXPECT_TRUE(bbq_empty(q)); + ret = bbq_dequeue_elem(q, &tmp_data); EXPECT_TRUE(ret == BBQ_QUEUE_EMPTY) << "ret " << ret; } @@ -527,8 +532,9 @@ TEST_F(bbq_head_cursor, drop_old_full_empty2) { i == 1 ? loop + 1 : 0, __LINE__); EXPECT_EQ(q->blocks[i].consumed.load(), 0); } - EXPECT_TRUE(bbq_check_array_bounds(q)); - bbq_ring_free(q); + + EXPECT_TRUE(bbq_debug_check_array_bounds(q)); + bbq_destory(q); EXPECT_TRUE(bbq_malloc_free_equal()); EXPECT_TRUE(test_malloc_free_equal()); } \ No newline at end of file diff --git a/bbq/tests/unittest/ut_mix.cc b/bbq/tests/unittest/ut_mix.cc index cb7c12a..daf6c97 100644 --- a/bbq/tests/unittest/ut_mix.cc +++ b/bbq/tests/unittest/ut_mix.cc @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-17 11:14:19 + * @LastEditTime: 2024-06-19 22:49:24 * @Email: liuyu@geedgenetworks.com * @Describe: bbq除了队列操作外,其他函数的测试 */ @@ -53,8 +53,6 @@ void *fetch_max_thread_func(void *arg) { } TEST_F(bbq_mix, fetch_max) { - test_memory_counter_clear(); - uint64_t ret = 0; ut_fetch_arg arg = {}; arg.data.store(1); // 初始化1 @@ -87,8 +85,6 @@ TEST_F(bbq_mix, fetch_max) { } TEST_F(bbq_mix, power_of_two) { - test_memory_counter_clear(); - uint32_t tmp = 0; uint32_t max = pow(2, 32) - 1; @@ -114,8 +110,6 @@ TEST_F(bbq_mix, power_of_two) { } TEST_F(bbq_mix, bbq_blocks_calc) { - test_memory_counter_clear(); - uint32_t tmp = 0; uint32_t max = pow(2, 32) - 1; @@ -148,8 +142,6 @@ TEST_F(bbq_mix, bbq_blocks_calc) { } TEST_F(bbq_mix, ceil_log2) { - test_memory_counter_clear(); - uint32_t tmp = 0; uint32_t max = pow(2, 32) - 1; diff --git a/bbq/tests/unittest/ut_multit.cc b/bbq/tests/unittest/ut_multit.cc index 8ab7276..0d29b1c 100644 --- a/bbq/tests/unittest/ut_multit.cc +++ b/bbq/tests/unittest/ut_multit.cc @@ -12,8 +12,8 @@ extern "C" { #include "ut.h" extern bool bbq_malloc_free_equal(); extern bool test_malloc_free_equal(); -extern void bbq_memory_print(); -bool bbq_check_array_bounds(struct bbq_s *q); +extern void bbq_debug_memory_print(); +bool bbq_debug_check_array_bounds(struct bbq *q); } class multit : public testing::Test { // 继承了 testing::Test @@ -89,6 +89,6 @@ TEST_F(multit, mpmc) { } test_free(TEST_MODULE_UTEST, exit_data); test_threads_destory(&test_info, threads); - EXPECT_TRUE(bbq_check_array_bounds((struct bbq_s *)q.ring)); + EXPECT_TRUE(bbq_debug_check_array_bounds((struct bbq *)q.ring)); test_queue_destory(&q); } diff --git a/perf/CMakeLists.txt b/perf/CMakeLists.txt index 54801b1..40e8326 100644 --- a/perf/CMakeLists.txt +++ b/perf/CMakeLists.txt @@ -7,7 +7,7 @@ include_directories( ${CMAKE_CURRENT_SOURCE_DIR}/../bbq/tests/common ${CMAKE_CURRENT_SOURCE_DIR}/thirdparty/iniparser ${CMAKE_CURRENT_SOURCE_DIR}/thirdparty/rmind_ringbuf - /home/admin/test/dpdk-23.07/build/install/include + /root/code/c/dpdk-21.11.4/install/include ) # 将bbq单元测试里的公共文件,添加到perf里。 @@ -34,7 +34,7 @@ set(EXEC_PATH ${OUTPUT_DIR}/bin) # 指定库路径 link_directories(${LIB_PATH}) link_directories(../bbq/build/output/lib/) -link_directories(/home/admin/test/dpdk-23.07/build/install/lib64) +link_directories(/root/code/c/dpdk-21.11.4/install/lib64 /root/code/c/dpdk-21.11.4/install/lib64/dpdk/pmds-22.0) # 可执行程序的名字 set(BENCHMARK_NAME benchmark) diff --git a/perf/benchmark/bcm_benchmark.c b/perf/benchmark/bcm_benchmark.c index 923fa1f..26d75de 100644 --- a/perf/benchmark/bcm_benchmark.c +++ b/perf/benchmark/bcm_benchmark.c @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-14 17:34:15 + * @LastEditTime: 2024-06-18 18:20:02 * @Email: liuyu@geedgenetworks.com * @Describe: TODO */ @@ -16,7 +16,7 @@ #include #include -extern void bbq_memory_print(); +extern void bbq_debug_memory_print(); extern bool bbq_malloc_free_equal(); void bcm_report_printf(test_cfg *cfg, test_merge_data *data, test_exit_data **raw_data, uint32_t thread_cnt, test_thread_type_e ttype) { @@ -102,7 +102,7 @@ int main(int argc, char *argv[]) { } } else { config = "/root/code/c/bbq-ly/perf/benchmark/config/compare/case1_simple_spsc.ini"; - ring_type = "rmind"; + ring_type = "bbq"; burst_cnt = 16; TEST_ERR_LOG("use default config, ringt_type:%s burst:%u config:%s argc:%d", ring_type, burst_cnt, config, argc); } @@ -151,7 +151,7 @@ int main(int argc, char *argv[]) { test_free(TEST_MODULE_BCM, exit_data); test_threads_destory(&test_info, threads); test_queue_destory(&q); - bbq_memory_print(); + bbq_debug_memory_print(); test_memory_counter_print(); return 0; diff --git a/perf/benchmark/bcm_queue.c b/perf/benchmark/bcm_queue.c index 0cd09e8..f6bce9d 100644 --- a/perf/benchmark/bcm_queue.c +++ b/perf/benchmark/bcm_queue.c @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-14 17:31:56 + * @LastEditTime: 2024-06-21 18:07:20 * @Email: liuyu@geedgenetworks.com * @Describe: TODO */ @@ -8,14 +8,14 @@ #include "ringbuf.h" static __rte_always_inline unsigned int -bcm_dpdk_ring_enqueue_burst(struct rte_ring *r, void **obj_table, uint32_t n, uint16_t thread_idx) { +bcm_dpdk_ring_enqueue_burst(struct rte_ring *r, void **obj_table, uint32_t n, uint16_t thread_idx, uint32_t *wait_consumed) { TEST_AVOID_WARNING(thread_idx); - return rte_ring_enqueue_burst(r, (void *const *)obj_table, n, NULL); -} + unsigned int free_space = 0; + int ret = 0; -static __rte_always_inline unsigned int -bcm_dpdk_ring_dequeue_burst(struct rte_ring *r, void *obj_table, unsigned int n) { - return rte_ring_dequeue_burst(r, (void **)obj_table, n, NULL); + ret = rte_ring_enqueue_burst(r, (void *const *)obj_table, n, &free_space); + *wait_consumed = r->size - free_space - 1; + return ret; } int test_queue_init_dpdk(test_cfg *cfg, test_queue_s *q) { @@ -40,7 +40,7 @@ int test_queue_init_dpdk(test_cfg *cfg, test_queue_s *q) { flags |= RING_F_MC_RTS_DEQ; } - q->ring = (void *)rte_ring_create("dpdk_ring", cfg->ring.entries_cnt, rte_socket_id(), RING_F_MP_RTS_ENQ | RING_F_MC_RTS_DEQ); + q->ring = (void *)rte_ring_create("dpdk_ring", cfg->ring.entries_cnt, rte_socket_id(), flags); if (q->ring == NULL) { return BBQ_NULL_PTR; } @@ -49,7 +49,7 @@ int test_queue_init_dpdk(test_cfg *cfg, test_queue_s *q) { q->enqueue_f = (test_ring_enqueue_f)rte_ring_enqueue; q->dequeue_f = (test_ring_dequeue_f)rte_ring_dequeue; q->enqueue_burst_f = (test_enqueue_burst_f)bcm_dpdk_ring_enqueue_burst; - q->dequeue_burst_f = (test_dequeue_burst_f)bcm_dpdk_ring_dequeue_burst; + q->dequeue_burst_f = (test_dequeue_burst_f)rte_ring_dequeue_burst; return BBQ_OK; } @@ -68,7 +68,8 @@ void test_queue_free_rmind(void *ring) { test_free(TEST_MODULE_RMIND, ring); } -uint32_t test_enqueue_burst_rmind(void *ring, void **obj_table, uint32_t n, uint16_t thread_idx) { +uint32_t test_enqueue_burst_rmind(void *ring, void **obj_table, uint32_t n, uint16_t thread_idx, uint32_t *wait_consumed) { + TEST_AVOID_WARNING(wait_consumed); uint32_t cnt = 0; int ret = 0; size_t off = 0; @@ -92,8 +93,9 @@ uint32_t test_enqueue_burst_rmind(void *ring, void **obj_table, uint32_t n, uint return cnt; } -uint32_t test_dequeue_burst_rmind(void *ring, void *obj_table, uint32_t n) { +uint32_t test_dequeue_burst_rmind(void *ring, void *obj_table, uint32_t n, uint32_t *wait_consumed) { TEST_AVOID_WARNING(n); + TEST_AVOID_WARNING(wait_consumed); size_t len = 0; size_t off = 0; size_t per_size = sizeof(void *); -- cgit v1.2.3