diff options
| author | liuyu <[email protected]> | 2024-06-27 05:13:48 -0400 |
|---|---|---|
| committer | liuyu <[email protected]> | 2024-06-27 05:13:48 -0400 |
| commit | 7f309a3257c04abbf20e5467d081da96413e4d21 (patch) | |
| tree | 371d7a4997b6a9be0c11ddafa8adaa49e931607f | |
| parent | cbaa973c4acad87f03d128bdbfe094f2e578f1f7 (diff) | |
编码风格、注释、接口调整、性能调整
| -rw-r--r-- | bbq/CMakeLists.txt | 4 | ||||
| -rw-r--r-- | bbq/include/bbq.h | 170 | ||||
| -rw-r--r-- | bbq/src/bbq.c | 359 | ||||
| -rw-r--r-- | bbq/tests/common/test_mix.c | 2 | ||||
| -rw-r--r-- | bbq/tests/common/test_queue.c | 31 | ||||
| -rw-r--r-- | bbq/tests/common/test_queue.h | 4 | ||||
| -rw-r--r-- | bbq/tests/unittest/ut_example.cc | 32 | ||||
| -rw-r--r-- | bbq/tests/unittest/ut_head_cursor.cc | 54 | ||||
| -rw-r--r-- | bbq/tests/unittest/ut_mix.cc | 64 | ||||
| -rw-r--r-- | bbq/tests/unittest/ut_multit.cc | 3 | ||||
| -rw-r--r-- | perf/benchmark/bcm_benchmark.c | 6 | ||||
| -rw-r--r-- | perf/benchmark/bcm_queue.c | 8 |
12 files changed, 402 insertions, 335 deletions
diff --git a/bbq/CMakeLists.txt b/bbq/CMakeLists.txt index 77020a8..1242a45 100644 --- a/bbq/CMakeLists.txt +++ b/bbq/CMakeLists.txt @@ -19,6 +19,10 @@ endif() add_compile_options(-Wall -Wextra) +if(CMAKE_BUILD_TYPE STREQUAL "Debug") + add_definitions(-DBBQ_MEMORY) +endif() + # 库生成的路径 set(LIB_PATH ${OUTPUT_DIR}/lib) # 测试程序生成的路径 diff --git a/bbq/include/bbq.h b/bbq/include/bbq.h index 56785fc..3b8269d 100644 --- a/bbq/include/bbq.h +++ b/bbq/include/bbq.h @@ -1,6 +1,6 @@ /* * @Author: [email protected] - * @LastEditTime: 2024-06-24 10:17:39 + * @LastEditTime: 2024-06-27 03:04:19 * @Describe: bbq(Block-based Bounded Queue)头文件 * 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf */ @@ -33,52 +33,57 @@ struct bbq_block { bbq_cursor allocated; // 已分配(version|offset) bbq_cursor reserved; // 已预留(version|offset) bbq_cursor consumed; // 已消费(version|offset)注:在drop-old模式下没用到 - char *entries; // 存储大小可变的entry,分配空间大小:bs * entry_size + char *entries; // 存储大小可变的entry,每个块分配空间:bs * entry_size } __BBQ_CACHE_ALIGNED; struct bbq { char name[BBQ_SYMBOL_MAX] __BBQ_CACHE_ALIGNED; - int32_t socket_id; // 用于libnuma分配内存,socket_id小于0将使用malloc分配 - uint32_t bn; // blocks的个数 - uint32_t bs; // blocks.entries的个数 - uint32_t flags; // 标记:retry new 模式,还是drop old模式 - uint32_t idx_bits; // bbq_head里idx所占的位数 - uint32_t off_bits; // bbq_cursor里offset所占的位数 - uint64_t idx_mask; // idx_bits偏移后的掩码 - uint64_t off_mask; // off_bits偏移后的掩码 - uint64_t entry_size; // blocks.entries里每个entry的大小 - bbq_head phead; // 生产者头,指向块的索引,分为两部分:version|idx - bbq_head chead; // 消费者头,指向块的索引,分为两部分:version|idx - struct bbq_block *blocks; // bn大小的数组 -}; -#define BBQ_F_DROP_OLD 0x0002 /**< 创建队列时设置为drop old模式(队列满时,入队成功并覆盖旧数据) */ -// #define BBQ_F_SP_ENQ 0x0004 /**< 创建队列时设置为单生产者 */ -// #define BBQ_F_SC_DEQ 0x0008 /**< 创建队列时设置为单消费者 */ + int32_t socket_id; // 用于libnuma分配内存,socket_id小于0将使用malloc分配 + uint32_t bn; // blocks的个数 + uint32_t bs; // blocks.entries的个数 + uint32_t flags; // 标记:retry new 模式,还是drop old模式 + uint32_t idx_bits; // bbq_head里idx所占的位数 + uint32_t off_bits; // bbq_cursor里offset所占的位数 + uint64_t idx_mask; // idx_bits偏移后的掩码 + uint64_t off_mask; // off_bits偏移后的掩码 + uint64_t entry_size; // blocks.entries里每个entry的大小 + bbq_head phead; // 生产者头,指向块的索引,分为两部分:version|idx + bbq_head chead; // 消费者头,指向块的索引,分为两部分:version|idx + + struct bbq_block *blocks; // bn大小的数组 +} __BBQ_CACHE_ALIGNED; #define BBQ_F_DEFAULT 0x0 + +#define BBQ_F_DROP_OLD 0x0002 /**< 创建队列时设置为drop old模式(队列满时,入队成功并覆盖旧数据) */ #define BBQ_F_RETRY_NEW BBQ_F_DEFAULT /**< 创建队列时设置为retry new模式(队列满时,当前入队失败) */ -#define BBQ_F_MP_ENQ BBQ_F_DEFAULT /**< 创建队列时设置为多生产者 */ -#define BBQ_F_MC_DEQ BBQ_F_DEFAULT /**< 创建队列时设置为多消费者 */ /** * 创建bbq队列,使用当前函数创建的队列,后续操作会把指针入队。 * 对应入队函数:bbq_enqueue、bbq_enqueue_burst * 对应出队函数:bbq_dequeue、bbq_dequeue_burst * * @param[in] name - * bbq名称 + * 队列名称 * @param[in] count - * 队列所有entry的个数,count必须大于1,且是2的N次方。 + * 队列大小,参数必须大于1,且是2的N次方。 * @param[in] socket_id * 多numa架构下,调用libnuma库函数针对指定socket分配内存。 * 当检测到不支持多numa,将转为malloc分配内存。 * @param[in] flags * 设置入队策略: - * 1)BBQ_F_RETRY_NEW(默认):队列满了当前入队失败。 - * 2)BBQ_F_DROP_OLD:队列满时,覆盖旧数据,入队成功 + * - BBQ_F_RETRY_NEW(默认):队列满了当前入队失败。 + * - BBQ_F_DROP_OLD:队列满时,覆盖旧数据,入队成功 + * 设置生产者消费者模式: + * - BBQ_F_SP_ENQ:单生产者 BBQ_F_MP_ENQ:多生产者(默认) + * - BBQ_F_SC_DEQ:单消费者 BBQ_F_MC_DEQ:多消费者(默认) * @return * 非NULL:消息队列结构体指针,用于后续出队入队等操作。 - * NULL:创建失败。 + * NULL:创建失败,可通过bbq_errno分析具体错误原因: + * - BBQ_ERR_OUT_OF_RANGE:name或count参数超出范围 + * - BBQ_ERR_ALLOC:申请内存失败 + * - BBQ_ERR_POWER_OF_TWO:count不为2的n次方 + * - BBQ_ERR_INPUT_NULL:name传入空指针 */ extern struct bbq *bbq_create(const char *name, uint32_t count, int socket_id, uint32_t flags); @@ -88,10 +93,14 @@ extern struct bbq *bbq_create(const char *name, uint32_t count, int socket_id, u * @param[in] q * 队列指针 * @param[in] data - * 则传入一维指针,如: - * int *data = malloc(sizeof(int));*data = 1; 传入&data + * 指向入队指针的指针,如: + * int *data = malloc(sizeof(int));*data = TEST_DATA; 传入&data * @return - * 成功返回0,失败返回小于0的错误码。 + * 成功返回0,失败返回小于0的错误码。可能存在以下错误码: + * - BBQ_ERR_INPUT_NULL:传入空指针 + * - BBQ_ERR_FULL:队列已满 + * - BBQ_ERR_BUSY:队列忙碌中 + * - BBQ_ERR:其它错误 */ extern int bbq_enqueue(struct bbq *q, void *const *data); @@ -101,15 +110,19 @@ extern int bbq_enqueue(struct bbq *q, void *const *data); * @param[in] q * 队列指针 * @param[out] data - * 则传入二维指针,如: + * 传入二级指针,如: * int *data = NULL; 传入&data * @return - * 成功返回0,失败返回小于0的错误码。 + * 成功返回0,失败返回小于0的错误码: + * - BBQ_ERR_INPUT_NULL:传入空指针 + * - BBQ_ERR_EMPTY:队列已空 + * - BBQ_ERR_BUSY:队列忙碌中 + * - BBQ_ERR:其它错误 */ extern int bbq_dequeue(struct bbq *q, void **data); /** - * 消息队列批量指针入队,尽可能一次入队n个指针,返回实际成功入队个数 + * 消息队列批量入队(指针入队),尽可能一次入队n个指针,返回实际成功入队个数 * * @param[in] q * 队列指针 @@ -118,45 +131,52 @@ extern int bbq_dequeue(struct bbq *q, void **data); * uint16_t **obj_table = malloc(sizeof(uint16_t **) * BUF_CNT); * for(int i=0;i<BUF_CNT;i++){ * obj_table[i] = malloc(sizeof(uint16_t)); - * obj_table[i] = 1; + * obj_table[i] = TEST_DATA; * } * 传入obj_table * @param[in] n * 尝试一次入队的个数 * @param[out] wait_consumed - * 如果为非NULL,返回当前队列中,已入队的个数。 + * 如果为非NULL,返回当前队列剩余的个数。注:该赋值可能会带来些许的性能损耗。 * @return - * 返回实际成功入队的个数 + * 返回实际成功入队的个数。当入队返回0时,可通过bbq_errno分析具体错误原因: + * - BBQ_ERR_INPUT_NULL:传入空指针 + * - BBQ_ERR_FULL:队列已满 + * - BBQ_ERR_BUSY:队列忙碌中 + * - BBQ_ERR:其它错误 */ extern uint32_t bbq_enqueue_burst(struct bbq *q, void *const *obj_table, uint32_t n, uint32_t *wait_consumed); /** - * 消息队列批量出队(指针出队),尽可能一次出队n个数据,返回实际成功出队个数 + * 消息队列批量指针出队,尽可能一次出队n个数据,返回实际成功出队个数 * * @param[in] q * 队列指针 * @param[out] obj_table - * 存储出队的指针,如: - * uint16_t **obj_table = malloc(sizeof(uint16_t *)) - * 传入obj_table + * 用于存储出队的指针,如: + * uint16_t **obj_table = malloc(sizeof(uint16_t *)); 传入obj_table * @param[in] n * 尝试一次出队的个数 * @param[out] wait_consumed - * 如果为非NULL,返回当前队列中,已入队的个数。 + * 如果为非NULL,返回当前队列中,已入队的个数。注:该赋值可能会带来些许的性能损耗 * @return - * 返回实际成功出队的个数 + * 返回实际成功出队的个数。当出队返回0时,可通过bbq_errno分析具体错误原因: + * - BBQ_ERR_INPUT_NULL:传入空指针 + * - BBQ_ERR_EMPTY:队列已空 + * - BBQ_ERR_BUSY:队列忙碌中 + * - BBQ_ERR:其它错误 */ extern uint32_t bbq_dequeue_burst(struct bbq *q, void **obj_table, uint32_t n, uint32_t *wait_consumed); /** - * 创建bbq队列,使用当前函数创建的队列,后续操作会把指针指向的数据拷贝入队。 + * 创建bbq队列,使用当前函数创建的队列,在后续操作会把指针指向的数据拷贝入队。 * 对应入队函数:bbq_enqueue_elem、bbq_enqueue_burst_elem * 对应出队函数:bbq_dequeue_elem、bbq_dequeue_burst_elem * * @param[in] name - * bbq名称 + * 队列名称 * @param[in] count - * 队列所有entry的个数,count必须大于1,且是2的N次方。 + * 队列大小,参数必须大于1,且是2的N次方。 * @param[in] socket_id * 多numa架构下,调用libnuma库函数针对指定socket分配内存。 * 当检测到不支持多numa,将转为malloc分配内存。 @@ -166,7 +186,11 @@ extern uint32_t bbq_dequeue_burst(struct bbq *q, void **obj_table, uint32_t n, u * 2)BBQ_F_DROP_OLD:队列满时,覆盖旧数据,入队成功 * @return * 非NULL:消息队列结构体指针,用于后续出队入队等操作。 - * NULL:创建失败。 + * NULL:创建失败。可通过bbq_errno分析具体错误原因: + * - BBQ_ERR_OUT_OF_RANGE:name或count参数超出范围 + * - BBQ_ERR_ALLOC:申请内存失败 + * - BBQ_ERR_POWER_OF_TWO:count不为2的n次方 + * - BBQ_ERR_INPUT_NULL:name传入空指针 */ extern struct bbq *bbq_create_elem(const char *name, uint32_t count, size_t obj_size, int socket_id, uint32_t flags); @@ -176,9 +200,13 @@ extern struct bbq *bbq_create_elem(const char *name, uint32_t count, size_t obj_ * @param[in] q * 队列指针 * @param[in] data - * 传入一维指针,如:int data = 1; 传入&data + * 传入一级指针,如:int data = 1; 传入&data * @return - * 成功返回0,失败返回小于0的错误码。 + * 成功返回0,失败返回小于0的错误码。可通过bbq_errno分析具体错误原因: + * - BBQ_ERR_INPUT_NULL:传入空指针 + * - BBQ_ERR_FULL:队列已满 + * - BBQ_ERR_BUSY:队列忙碌中 + * - BBQ_ERR:其它错误 */ extern int bbq_enqueue_elem(struct bbq *q, void const *data); @@ -188,9 +216,13 @@ extern int bbq_enqueue_elem(struct bbq *q, void const *data); * @param[in] q * 队列指针 * @param[in] data - * 则传入一维指针,如:int data; 传入&data + * 则传入一级指针,如:int data; 传入&data * @return - * 成功返回0,失败返回小于0的错误码。 + * 成功返回0,失败返回小于0的错误码。可通过bbq_errno分析具体错误原因: + * - BBQ_ERR_INPUT_NULL:传入空指针 + * - BBQ_ERR_EMPTY:队列已空 + * - BBQ_ERR_BUSY:队列忙碌中 + * - BBQ_ERR:其它错误 */ extern int bbq_dequeue_elem(struct bbq *q, void *data); @@ -200,14 +232,18 @@ extern int bbq_dequeue_elem(struct bbq *q, void *data); * @param[in] q * 队列指针 * @param[in] obj_table - * 即将入队的数组,将数组里的每个成员入队,如: + * 将数组里的每个数据入队,如: * uint16_t obj_table[1024] = {初始化数据}; 传入obj_table * @param[in] n * 尝试一次入队的个数 * @param[out] wait_consumed - * 如果为非NULL,返回当前队列中,已入队的个数。 + * 如果为非NULL,返回当前队列中,已入队的个数。。注:该赋值可能会带来些许的性能损耗 * @return - * 返回实际成功入队个数 + * 返回实际成功入队的个数。当入队返回0时,可通过bbq_errno分析具体错误原因: + * - BBQ_ERR_INPUT_NULL:传入空指针 + * - BBQ_ERR_FULL:队列已满 + * - BBQ_ERR_BUSY:队列忙碌中 + * - BBQ_ERR:其它错误 */ extern uint32_t bbq_enqueue_burst_elem(struct bbq *q, void const *obj_table, uint32_t n, uint32_t *wait_consumed); @@ -222,9 +258,13 @@ extern uint32_t bbq_enqueue_burst_elem(struct bbq *q, void const *obj_table, uin * @param[in] n * 尝试一次出队的个数 * @param[out] wait_consumed - * 如果为非NULL,返回当前队列中,已入队的个数。 + * 如果为非NULL,返回当前队列中,已入队的个数。。注:该赋值可能会带来些许的性能损耗 * @return - * 返回实际成功出队个数 + * 成功返回0,失败返回小于0的错误码。可通过bbq_errno分析具体错误原因: + * - BBQ_ERR_INPUT_NULL:传入空指针 + * - BBQ_ERR_EMPTY:队列已空 + * - BBQ_ERR_BUSY:队列忙碌中 + * - BBQ_ERR:其它错误 */ extern uint32_t bbq_dequeue_burst_elem(struct bbq *q, void *obj_table, uint32_t n, uint32_t *wait_consumed); @@ -255,15 +295,17 @@ extern void bbq_destory(struct bbq *q); */ extern void bbq_debug_struct_print(struct bbq *q); -// 通用返回码 -#define BBQ_OK 0 // 成功 -#define BBQ_ERROR -1 // 通用错误 -#define BBQ_ALLOC_ERR -2 // 内存分配失败 -#define BBQ_NULL_PTR -3 // 空指针 -#define BBQ_UNKNOWN_TYPE -3 // 未知类型 +// 错误码 +#define BBQ_OK 0 // 成功 +#define BBQ_ERR -1 // 通用错误,无法分类时使用 +#define BBQ_ERR_ALLOC -2 // 内存分配失败 +#define BBQ_ERR_INPUT_NULL -3 // 传入空指针 +#define BBQ_ERR_POWER_OF_TWO -4 // 不是2的n次方 +#define BBQ_ERR_OUT_OF_RANGE -5 // 超出范围 + +#define BBQ_ERR_FULL -101 // 队列已满(入队失败) +#define BBQ_ERR_BUSY -102 // 队列忙碌中(入队或出队失败) +#define BBQ_ERR_EMPTY -103 // 队列已空(出队失败) +#define BBQ_ERR_NOT_SUPPORT -104 // 不支持的操作 -// 队列错误 -#define BBQ_QUEUE_FULL -1001 // 队列已满(入队失败) -#define BBQ_QUEUE_BUSY -1002 // 队列忙碌中(入队或出队失败) -#define BBQ_QUEUE_EMPTY -1003 // 队列已空(出队失败) -#define BBQ_QUEUE_DATA_ERR -1004 // 传入的数据异常
\ No newline at end of file +extern __thread int32_t bbq_errno;
\ No newline at end of file diff --git a/bbq/src/bbq.c b/bbq/src/bbq.c index d5bdf03..5e63c51 100644 --- a/bbq/src/bbq.c +++ b/bbq/src/bbq.c @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-24 10:11:24 + * @LastEditTime: 2024-06-26 04:59:28 * @Email: [email protected] * @Describe: bbq(Block-based Bounded Queue)实现 * 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf @@ -12,12 +12,12 @@ #include <string.h> // flags第1位控制入队时的数据拷贝策略,默认是"拷贝指针" -#define BBQ_F_COPY_PTR 0x0 /**< 默认为拷贝指针 */ -#define BBQ_F_COPY_VALUE 0x0001 /**< 创建队列时设置为拷贝数值 */ +#define BBQ_F_COPY_PTR BBQ_F_DEFAULT /**< 默认为拷贝指针 */ +#define BBQ_F_COPY_VALUE 0x0001 /**< 创建队列时设置为拷贝数值 */ // 判断flags标记位 #define BBQ_F_CHK_DROP_OLD(flags) (flags & BBQ_F_DROP_OLD) -#define BBQ_F_CHK_VALUE(flags) (flags & BBQ_F_COPY_VALUE) +#define BBQ_F_CHK_COPY_VALUE(flags) (flags & BBQ_F_COPY_VALUE) // 避免无用参数的编译告警 #define AVOID_WARNING(param) ((void)param) @@ -27,6 +27,8 @@ printf("\x1b[31m [ERR][%s:%d:%s]" fmt "\x1b[0m\n", __func__, __LINE__, __FILE__, ##__VA_ARGS__); \ } while (0) +__thread int32_t bbq_errno; + struct bbq_status { int32_t status; // 返回状态 uint32_t actual_burst; // 实际出/入队个数 @@ -42,46 +44,45 @@ enum bbq_queue_state { }; struct bbq_entry_desc { - uint64_t vsn; // allocated游标的版本(vsn) TODO:修正注释 - uint64_t off; // entry在当前block的偏移(offset) + uint64_t vsn; // allocated或reserved的版本(vsn) + uint64_t off; // entry在当前块的偏移(offset) uint32_t actual_burst; // 实际出/入队个数 - struct bbq_block *block; // 指向所在的block + struct bbq_block *block; // 指向所在的块 }; struct bbq_queue_state_s { - enum bbq_queue_state state; // 队列状态 - union { // TODO: - uint64_t vsn; // reserve_entry state==BLOCK_DONE时生效 + enum bbq_queue_state state; // 队列状态 + union { + uint64_t vsn; // bbq_reserve_entry state==BLOCK_DONE时生效 struct bbq_entry_desc e; // state为ALLOCATED、RESERVED生效 }; }; -extern inline uint64_t bbq_idx(struct bbq *q, uint64_t x) { +extern inline uint64_t bbq_head_idx(struct bbq *q, uint64_t x) { return x & q->idx_mask; } -extern inline uint64_t bbq_off(struct bbq *q, uint64_t x) { - return x & q->off_mask; -} - extern inline uint64_t bbq_head_vsn(struct bbq *q, uint64_t x) { return x >> q->idx_bits; } +extern inline uint64_t bbq_cur_off(struct bbq *q, uint64_t x) { + return x & q->off_mask; +} + extern inline uint64_t bbq_cur_vsn(struct bbq *q, uint64_t x) { return x >> q->off_bits; } -extern inline uint64_t set_cur_vsn(struct bbq *q, uint64_t ver) { +static inline uint64_t bbq_set_cur_vsn(struct bbq *q, uint64_t ver) { return ver << q->off_bits; } // 当BBQ_MEMORY宏定义开关打开,将对内存分配释放进行统计,方便排查内存泄漏 -// #define BBQ_MEMORY enum bbq_module { - BBQ_MODULE_QUEUE = 0, - BBQ_MODULE_QUEUE_BLOCK_NB, - BBQ_MODULE_QUEUE_BLOCK_ENTRY, + BBQ_MODULE_MAIN = 0, + BBQ_MODULE_BLOCK_NB, + BBQ_MODULE_BLOCK_ENTRY, BBQ_MODULE_MAX, }; @@ -132,7 +133,7 @@ static void bbq_free(enum bbq_module module, int socket_id, void *ptr, size_t si } /* 原子的比较两个值大小,并设置较大的值,成功则返回设置前的旧值 */ -uint64_t fetch_max(aotmic_uint64 *atom, uint64_t upd) { +uint64_t bbq_fetch_max(aotmic_uint64 *atom, uint64_t upd) { uint64_t old_value = 0; do { old_value = atomic_load(atom); @@ -143,7 +144,7 @@ uint64_t fetch_max(aotmic_uint64 *atom, uint64_t upd) { /* 检查参数是否为2的N次幂 */ bool bbq_check_power_of_two(uint32_t n) { - if (n <= 0) { + if (n == 0) { return false; } @@ -152,7 +153,7 @@ bool bbq_check_power_of_two(uint32_t n) { /* 根据entries大小返回合理的block个数 * 计算公式:log2(blocks) = max(1, ⌊log2(count)/4⌋) 注:⌊ ⌋代表向下取整。*/ -uint32_t bbq_blocks_calc(uint32_t entries) { +static uint32_t bbq_block_number_calc(uint32_t entries) { double log_entries = log2((double)entries); uint32_t over4 = (uint32_t)(log_entries / 4); // 向下取整 uint32_t max_value = (over4 > 1) ? over4 : 1; @@ -160,22 +161,44 @@ uint32_t bbq_blocks_calc(uint32_t entries) { return n; } +int bbq_bnbs_calc(uint32_t entries, uint32_t *bn, uint32_t *bs) { + if (bn == NULL || bs == NULL) { + bbq_errno = BBQ_ERR_INPUT_NULL; + return bbq_errno; + } + + if (entries <= 1) { + bbq_errno = BBQ_ERR_OUT_OF_RANGE; + return bbq_errno; + } + + if (bbq_check_power_of_two(entries) == false) { + bbq_errno = BBQ_ERR_POWER_OF_TWO; + return bbq_errno; + } + + *bn = bbq_block_number_calc(entries); + *bs = entries / *bn; + + return BBQ_OK; +} + /* 块初始化 */ int block_init(struct bbq *q, struct bbq_block *block, bool cursor_init) { #ifdef BBQ_MEMORY - // 末尾多分配一个entry(永远不应该被修改),以此检查是否存在写越界的问题 - block->entries = bbq_malloc(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id, + // 末尾多分配一个entry,它永远不应该被修改,以此检查是否存在写越界的问题 + block->entries = bbq_malloc(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, (q->bs + 1) * q->entry_size); char *last_entry = block->entries + q->entry_size * q->bs; memset(last_entry, BBQ_MEM_MAGIC, q->entry_size); #else - block->entries = bbq_malloc(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id, + block->entries = bbq_malloc(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, q->bs * q->entry_size); #endif if (block->entries == NULL) { - BBQ_ERR_LOG("bbq_malloc error"); - return BBQ_ALLOC_ERR; + bbq_errno = BBQ_ERR_ALLOC; + return bbq_errno; } block->committed = ATOMIC_VAR_INIT(0); @@ -189,7 +212,7 @@ int block_init(struct bbq *q, struct bbq_block *block, bool cursor_init) { block->allocated = ATOMIC_VAR_INIT(q->bs); block->reserved = ATOMIC_VAR_INIT(q->bs); if (BBQ_F_CHK_DROP_OLD(q->flags)) { - block->consumed = ATOMIC_VAR_INIT(0); + block->consumed = ATOMIC_VAR_INIT(0); // drop old模式下用不到consumed } else { block->consumed = ATOMIC_VAR_INIT(q->bs); } @@ -202,11 +225,11 @@ int block_init(struct bbq *q, struct bbq_block *block, bool cursor_init) { void block_destory(struct bbq *q, struct bbq_block *block) { if (block->entries) { #ifdef BBQ_MEMORY - bbq_free(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id, - block->entries, sizeof(*block->entries) * (q->bs + 1) * q->entry_size); + bbq_free(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, + block->entries, (q->bs + 1) * q->entry_size); #else - bbq_free(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id, - block->entries, sizeof(*block->entries) * q->bs * q->entry_size); + bbq_free(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, + block->entries, q->bs * q->entry_size); #endif block->entries = NULL; } @@ -214,21 +237,18 @@ void block_destory(struct bbq *q, struct bbq_block *block) { /* 求x在二进制表示中最高位1所在的位置,x参数不能为0。 -例如:x=1,return 0 (...1) -x=3,return 1 (..11) -x=9,return 3 (1..1) +例如:x=1,return 0 (...1); x=3,return 1 (..11); x=9,return 3 (1..1) */ -unsigned floor_log2(uint64_t x) { - return x == 1 ? 0 : 1 + floor_log2(x >> 1); +static unsigned bbq_floor_log2(uint64_t x) { + return x == 1 ? 0 : 1 + bbq_floor_log2(x >> 1); } /* 返回以2为底x的对数,并向上取整值。 -例如:x=1,return 0 (2^0=1) -x=99, return 7(2^6=64 2^7=128) +例如:x=1,return 0 (2^0=1); x=99, return 7(2^6=64 2^7=128) */ -unsigned ceil_log2(uint64_t x) { - return x == 1 ? 0 : floor_log2(x - 1) + 1; +static unsigned bbq_ceil_log2(uint64_t x) { + return x == 1 ? 0 : bbq_floor_log2(x - 1) + 1; } /* 创建消息队列,bn和bs必须是2的N次幂,socket_id用于多numa分配内存 */ @@ -236,33 +256,33 @@ static struct bbq *__bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs, int ret = 0; if (bbq_check_power_of_two(bn) == false) { - BBQ_ERR_LOG("block number is not power of two, now is :%u", bn); + bbq_errno = BBQ_ERR_POWER_OF_TWO; return NULL; } if (bbq_check_power_of_two(bs) == false) { - BBQ_ERR_LOG("block size is not power of two, now is :%u", bs); + bbq_errno = BBQ_ERR_POWER_OF_TWO; return NULL; } - if (obj_size == 0) { - BBQ_ERR_LOG("obj_size is 0"); + if (name == NULL) { + bbq_errno = BBQ_ERR_INPUT_NULL; return NULL; } - if (name == NULL || strlen(name) >= BBQ_SYMBOL_MAX - 1) { - BBQ_ERR_LOG("invalid bbq name, max len is %d", BBQ_SYMBOL_MAX - 1); + if (strlen(name) >= BBQ_SYMBOL_MAX - 1 || obj_size == 0) { + bbq_errno = BBQ_ERR_OUT_OF_RANGE; return NULL; } if (numa_available() < 0) { - // 不支持numa,设置 + // 不支持numa socket_id = BBQ_SOCKET_ID_ANY; } - struct bbq *q = bbq_malloc(BBQ_MODULE_QUEUE, socket_id, sizeof(*q)); + struct bbq *q = bbq_malloc(BBQ_MODULE_MAIN, socket_id, sizeof(*q)); if (q == NULL) { - BBQ_ERR_LOG("malloc for bbq queue error"); + bbq_errno = BBQ_ERR_ALLOC; return NULL; } memset(q, 0, sizeof(*q)); @@ -275,25 +295,22 @@ static struct bbq *__bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs, q->chead = ATOMIC_VAR_INIT(0); q->flags = flags; - q->blocks = bbq_malloc(BBQ_MODULE_QUEUE_BLOCK_NB, socket_id, bn * sizeof(*q->blocks)); + q->blocks = bbq_malloc(BBQ_MODULE_BLOCK_NB, socket_id, bn * sizeof(*q->blocks)); if (q->blocks == NULL) { - BBQ_ERR_LOG("bbq malloc for blocks error"); + bbq_errno = BBQ_ERR_ALLOC; goto error; } memset(q->blocks, 0, sizeof(*q->blocks)); for (uint32_t i = 0; i < bn; ++i) { - // 第一个block不需要设置cursor_init - bool cursor_init = (i == 0 ? false : true); - ret = block_init(q, &(q->blocks[i]), cursor_init); + ret = block_init(q, &(q->blocks[i]), (i == 0 ? false : true)); if (ret != BBQ_OK) { - BBQ_ERR_LOG("bbq block init error"); goto error; } } - q->idx_bits = ceil_log2(bn); - q->off_bits = ceil_log2(bs) + 1; // 多线程同时add,可能超过bs的问题,因此多分配一位 + q->idx_bits = bbq_ceil_log2(bn); + q->off_bits = bbq_ceil_log2(bs) + 1; // 多线程同时add,可能超过bs的问题,因此多分配一位 TODO:bs占位少的时候多分一些? q->idx_mask = (1 << q->idx_bits) - 1; q->off_mask = (1 << q->off_bits) - 1; @@ -305,37 +322,41 @@ error: return NULL; } -struct bbq *bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs, int socket_id, uint32_t flags) { +/* 使用自定义的bn、bs创建指针入队的bbq,一般用于单元测试 */ +struct bbq *bbq_create_with_bnbs(const char *name, uint32_t bn, uint32_t bs, int socket_id, uint32_t flags) { + bbq_errno = BBQ_OK; return __bbq_create_bnbs(name, bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR); } -struct bbq *bbq_create_bnbs_elem(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags) { +/* 使用自定义的bn、bs创建值入队的bbq,一般用于单元测试 */ +struct bbq *bbq_create_elem_with_bnbs(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags) { + bbq_errno = BBQ_OK; return __bbq_create_bnbs(name, bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE); } /* 创建消息队列,count必须大于1,且是2的N次幂,bn和bs将根据count值自动计算,socket_id用于多numa分配内存,free_func先设置NULL */ struct bbq *bbq_create_elem(const char *name, uint32_t count, size_t obj_size, int socket_id, uint32_t flags) { - if (bbq_check_power_of_two(count) == false || count == 1) { - BBQ_ERR_LOG("bbq entries number must be power of two and greater than 1, now is :%u", count); + bbq_errno = BBQ_OK; + uint32_t bn = 0; + uint32_t bs = 0; + + if (bbq_bnbs_calc(count, &bn, &bs) != BBQ_OK) { return NULL; } - uint32_t bn = bbq_blocks_calc(count); - uint32_t bs = count / bn; - - return bbq_create_bnbs_elem(name, bn, bs, obj_size, socket_id, flags); + return __bbq_create_bnbs(name, bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE); } struct bbq *bbq_create(const char *name, uint32_t count, int socket_id, uint32_t flags) { - if (bbq_check_power_of_two(count) == false || count == 1) { - BBQ_ERR_LOG("bbq entries number must be power of two and greater than 1, now is :%u", count); + bbq_errno = BBQ_OK; + uint32_t bn = 0; + uint32_t bs = 0; + + if (bbq_bnbs_calc(count, &bn, &bs) != BBQ_OK) { return NULL; } - uint32_t bn = bbq_blocks_calc(count); - uint32_t bs = count / bn; - - return bbq_create_bnbs(name, bn, bs, socket_id, flags); + return __bbq_create_bnbs(name, bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR); } /* 释放消息队列,与bbq_ring_create系列接口成对*/ @@ -348,8 +369,8 @@ void bbq_destory(struct bbq *q) { block_destory(q, &(q->blocks[i])); } - bbq_free(BBQ_MODULE_QUEUE_BLOCK_NB, q->socket_id, q->blocks, q->bn * sizeof(*q->blocks)); - bbq_free(BBQ_MODULE_QUEUE, q->socket_id, q, sizeof(*q)); + bbq_free(BBQ_MODULE_BLOCK_NB, q->socket_id, q->blocks, q->bn * sizeof(*q->blocks)); + bbq_free(BBQ_MODULE_MAIN, q->socket_id, q, sizeof(*q)); } #define BBQ_DATA_TYPE_SINGLE 0x0 @@ -358,7 +379,7 @@ void bbq_destory(struct bbq *q) { void commit_entry(struct bbq *q, struct bbq_entry_desc *e, void const *data, uint32_t data_type) { size_t idx = e->off * q->entry_size; - if (BBQ_F_CHK_VALUE(q->flags)) { + if (BBQ_F_CHK_COPY_VALUE(q->flags)) { // 数据入队列 switch (data_type) { case BBQ_DATA_TYPE_ARRAY_1D: @@ -376,6 +397,7 @@ void commit_entry(struct bbq *q, struct bbq_entry_desc *e, void const *data, uin break; } default: + bbq_errno = BBQ_ERR; break; } } else { @@ -387,8 +409,8 @@ void commit_entry(struct bbq *q, struct bbq_entry_desc *e, void const *data, uin memcpy(&(e->block->entries[idx]), data, q->entry_size * e->actual_burst); break; case BBQ_DATA_TYPE_ARRAY_1D: - break; default: + bbq_errno = BBQ_ERR; break; } } @@ -397,17 +419,17 @@ void commit_entry(struct bbq *q, struct bbq_entry_desc *e, void const *data, uin struct bbq_queue_state_s allocate_entry(struct bbq *q, struct bbq_block *block, uint32_t n) { struct bbq_queue_state_s state = {0}; - if (bbq_off(q, atomic_load(&block->allocated)) >= q->bs) { + if (bbq_cur_off(q, atomic_load(&block->allocated)) >= q->bs) { state.state = BBQ_BLOCK_DONE; return state; } uint64_t old = atomic_fetch_add(&block->allocated, n); + // committed_vsn在当前块被初始化后值是不变的,通过比较vsn值,来判断allocated的off是否溢出了,导致vsn+1 uint64_t committed_vsn = bbq_cur_vsn(q, atomic_load(&block->committed)); - // committed_vsn在当前块被初始化后值是不变的,通过比较vsn值,来判断allocated的off是否溢出了,导致vsn+1 uint64_t cur_vsn = bbq_cur_vsn(q, old); - uint64_t cur_off = bbq_off(q, old); + uint64_t cur_off = bbq_cur_off(q, old); if ((cur_vsn != committed_vsn) || (cur_off >= q->bs)) { state.state = BBQ_BLOCK_DONE; return state; @@ -429,27 +451,28 @@ struct bbq_queue_state_s allocate_entry(struct bbq *q, struct bbq_block *block, } enum bbq_queue_state advance_phead(struct bbq *q, uint64_t ph) { - // 获取下一个block uint64_t cur = 0; - struct bbq_block *n_blk = &(q->blocks[(bbq_idx(q, ph) + 1) & q->idx_mask]); + // 获取下一个block + struct bbq_block *n_blk = &(q->blocks[(bbq_head_idx(q, ph) + 1) & q->idx_mask]); uint64_t ph_vsn = bbq_head_vsn(q, ph); if (BBQ_F_CHK_DROP_OLD(q->flags)) { cur = atomic_load(&n_blk->committed); - // 生产者避免前进到上一轮中尚未完全提交的区块 - if (bbq_cur_vsn(q, cur) == ph_vsn && bbq_off(q, cur) != q->bs) { + // 生产者head避免覆盖上一轮尚未完全提交的区块 + if (bbq_cur_vsn(q, cur) == ph_vsn && bbq_cur_off(q, cur) != q->bs) { return BBQ_NOT_AVAILABLE; } } else { cur = atomic_load(&n_blk->consumed); uint64_t reserved; - uint64_t consumed_off = bbq_off(q, cur); + uint64_t consumed_off = bbq_cur_off(q, cur); uint64_t consumed_vsn = bbq_cur_vsn(q, cur); - if (consumed_vsn < ph_vsn || // 生产者赶上了消费者 + if (consumed_vsn < ph_vsn || (consumed_vsn == ph_vsn && consumed_off != q->bs)) { + // 生产者赶上了消费者 reserved = atomic_load(&n_blk->reserved); - if (bbq_off(q, reserved) == consumed_off) { + if (bbq_cur_off(q, reserved) == consumed_off) { return BBQ_NO_ENTRY; } else { return BBQ_NOT_AVAILABLE; @@ -457,19 +480,20 @@ enum bbq_queue_state advance_phead(struct bbq *q, uint64_t ph) { } } - // 用head的version初始化下一个块,version在高位,version+1,idex/offset清零,如果没有被其他线程执行过,数值会高于旧值。多线程同时只更新一次。 - uint64_t new_vsn = set_cur_vsn(q, ph_vsn + 1); - fetch_max(&n_blk->committed, new_vsn); - fetch_max(&n_blk->allocated, new_vsn); + // 用head的version初始化下一个块 + // version在高位,version+1,index或offset也会被清零,如果没有被其他线程执行过。多线程同时只更新一次。 + uint64_t new_vsn = bbq_set_cur_vsn(q, ph_vsn + 1); + bbq_fetch_max(&n_blk->committed, new_vsn); + bbq_fetch_max(&n_blk->allocated, new_vsn); - // 索引+1,当超过索引范围,也就是循环下一轮块时,version+1 - fetch_max(&q->phead, ph + 1); + // ph+1,当超过索引范围,进入下一轮时,version会自动+1 + bbq_fetch_max(&q->phead, ph + 1); return BBQ_SUCCESS; } static uint32_t bbq_wait_consumed_set(struct bbq *q, uint64_t *ch_ptr, uint64_t *ph_ptr, struct bbq_block *blk_ph) { - uint64_t ch; - uint64_t ph; + uint64_t ch = 0; + uint64_t ph = 0; if (ch_ptr != NULL) { ch = *ch_ptr; } else { @@ -482,14 +506,14 @@ static uint32_t bbq_wait_consumed_set(struct bbq *q, uint64_t *ch_ptr, uint64_t ph = atomic_load(&q->phead); } - uint64_t ph_idx = bbq_idx(q, ph); - uint64_t ch_idx = bbq_idx(q, ch); - uint64_t committed_off = bbq_off(q, atomic_load(&blk_ph->committed)); + uint64_t ph_idx = bbq_head_idx(q, ph); + uint64_t ch_idx = bbq_head_idx(q, ch); + uint64_t committed_off = bbq_cur_off(q, atomic_load(&blk_ph->committed)); - struct bbq_block *blk_ch = &(q->blocks[bbq_idx(q, ch)]); - uint64_t reserved_off = bbq_off(q, atomic_load(&blk_ch->reserved)); + struct bbq_block *blk_ch = &(q->blocks[bbq_head_idx(q, ch)]); + uint64_t reserved_off = bbq_cur_off(q, atomic_load(&blk_ch->reserved)); - // "生产者"超过"消费者"的索引(即块)的个数 + // "生产者"超过"消费者"块的个数 uint64_t idx_diff = ph_idx >= ch_idx ? ph_idx - ch_idx : q->bn - ch_idx + ph_idx; if (!BBQ_F_CHK_DROP_OLD(q->flags)) { // 这里idx_diff-1=-1也是正确。 @@ -500,36 +524,37 @@ static uint32_t bbq_wait_consumed_set(struct bbq *q, uint64_t *ch_ptr, uint64_t uint64_t ph_vsn = bbq_head_vsn(q, ph); if (ph_vsn == ch_vsn || (ph_vsn == (ch_vsn + 1) && (ph_idx < ch_idx))) { + // drop old模式,未发生覆盖 return (idx_diff - 1) * q->bs + (q->bs - reserved_off + committed_off); } - // 发生了覆盖 if (ph_idx == ch_idx) { - // 当前块以及之前已生产的都作废 + // drop old模式,发生了覆盖,当前块以及之前已生产的都作废 return 0; } return (idx_diff - 1) * q->bs + committed_off; } +//----------------------------------------------------------------- /* 消息队列入队 */ -static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t n, uint32_t flag, uint32_t *wait_consumed) { +static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t n, uint32_t data_type, uint32_t *wait_consumed) { struct bbq_status ret = {.status = 0, .actual_burst = 0}; if (q == NULL || data == NULL) { - ret.status = BBQ_NULL_PTR; + bbq_errno = BBQ_ERR_INPUT_NULL; + ret.status = bbq_errno; return ret; } while (true) { - // 获取当前phead,转为索引后获取到当前的blk uint64_t ph = atomic_load(&q->phead); - struct bbq_block *blk = &(q->blocks[bbq_idx(q, ph)]); + struct bbq_block *blk = &(q->blocks[bbq_head_idx(q, ph)]); struct bbq_queue_state_s state = allocate_entry(q, blk, n); switch (state.state) { case BBQ_ALLOCATED: - commit_entry(q, &state.e, data, flag); + commit_entry(q, &state.e, data, data_type); ret.actual_burst = state.e.actual_burst; ret.status = BBQ_OK; break; @@ -540,17 +565,21 @@ static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t } if (pstate == BBQ_NO_ENTRY) { - ret.status = BBQ_QUEUE_FULL; + bbq_errno = BBQ_ERR_FULL; + ret.status = bbq_errno; } else if (pstate == BBQ_NOT_AVAILABLE) { - ret.status = BBQ_QUEUE_BUSY; + bbq_errno = BBQ_ERR_BUSY; + ret.status = bbq_errno; } else { - ret.status = BBQ_ERROR; + bbq_errno = BBQ_ERR; + ret.status = bbq_errno; } break; } default: - ret.status = BBQ_ERROR; + bbq_errno = BBQ_ERR; + ret.status = bbq_errno; break; } @@ -563,21 +592,23 @@ static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t } int bbq_enqueue(struct bbq *q, void *const *data) { + bbq_errno = BBQ_OK; struct bbq_status ret = __bbq_enqueue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL); return ret.status; } int bbq_enqueue_elem(struct bbq *q, void const *data) { + bbq_errno = BBQ_OK; struct bbq_status ret = __bbq_enqueue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL); return ret.status; } /* 更新成功 reserve成功的个数 */ -uint32_t reserve_update(bbq_cursor *aotmic, uint64_t reserved, uint32_t n) { +uint32_t bbq_reserve_update(bbq_cursor *aotmic, uint64_t reserved, uint32_t n) { // TODO:逻辑可以合并 if (n == 1) { // fetch_max返回的是旧值,如果旧值等于局部变量reserved,则代表数据由当前线程更新 - if (fetch_max(aotmic, reserved + 1) == reserved) { + if (bbq_fetch_max(aotmic, reserved + 1) == reserved) { return 1; } @@ -588,11 +619,11 @@ uint32_t reserve_update(bbq_cursor *aotmic, uint64_t reserved, uint32_t n) { } } -struct bbq_queue_state_s reserve_entry(struct bbq *q, struct bbq_block *block, uint32_t n) { +struct bbq_queue_state_s bbq_reserve_entry(struct bbq *q, struct bbq_block *block, uint32_t n) { while (true) { struct bbq_queue_state_s state; uint64_t reserved = atomic_load(&block->reserved); - uint64_t reserved_off = bbq_off(q, reserved); + uint64_t reserved_off = bbq_cur_off(q, reserved); uint64_t reserved_svn = bbq_cur_vsn(q, reserved); if (reserved_off < q->bs) { @@ -607,7 +638,7 @@ struct bbq_queue_state_s reserve_entry(struct bbq *q, struct bbq_block *block, u } uint64_t committed = atomic_load(&block->committed); - uint64_t committed_off = bbq_off(q, committed); + uint64_t committed_off = bbq_cur_off(q, committed); if (committed_off == reserved_off) { state.state = BBQ_NO_ENTRY; return state; @@ -616,14 +647,14 @@ struct bbq_queue_state_s reserve_entry(struct bbq *q, struct bbq_block *block, u // 当前块的数据没有被全部commited,需要通过判断allocated和committed来判断是否存在正在入队进行中的数据 if (committed_off != q->bs) { uint64_t allocated = atomic_load(&block->allocated); - if (bbq_off(q, allocated) != committed_off) { + if (bbq_cur_off(q, allocated) != committed_off) { state.state = BBQ_NOT_AVAILABLE; return state; } } uint32_t tmp = committed_off - reserved_off; - uint32_t reserved_cnt = reserve_update(&block->reserved, reserved, tmp < n ? tmp : n); + uint32_t reserved_cnt = bbq_reserve_update(&block->reserved, reserved, tmp < n ? tmp : n); if (reserved_cnt > 0) { // TODO:多entry时关注 state.state = BBQ_RESERVED; state.e.actual_burst = reserved_cnt; @@ -647,7 +678,7 @@ struct bbq_queue_state_s reserve_entry(struct bbq *q, struct bbq_block *block, u bool consume_entry(struct bbq *q, struct bbq_entry_desc *e, void *deq_data, uint32_t data_type) { size_t idx = e->off * q->entry_size; - if (BBQ_F_CHK_VALUE(q->flags)) { + if (BBQ_F_CHK_COPY_VALUE(q->flags)) { switch (data_type) { case BBQ_DATA_TYPE_ARRAY_1D: case BBQ_DATA_TYPE_SINGLE: @@ -694,7 +725,7 @@ bool consume_entry(struct bbq *q, struct bbq_entry_desc *e, void *deq_data, uint } bool advance_chead(struct bbq *q, uint64_t ch, uint64_t ver) { - uint64_t ch_idx = bbq_idx(q, ch); + uint64_t ch_idx = bbq_head_idx(q, ch); struct bbq_block *n_blk = &(q->blocks[(ch_idx + 1) & q->idx_mask]); uint64_t ch_vsn = bbq_head_vsn(q, ch); @@ -705,18 +736,18 @@ bool advance_chead(struct bbq *q, uint64_t ch, uint64_t ver) { // 第一个块是一个特殊情况,因为与其他块相比,它的版本总是相差一个。因此,如果 ch_idx == 0,我们在比较中加 1 if (committed_vsn < ver + (ch_idx == 0)) return false; - fetch_max(&n_blk->reserved, set_cur_vsn(q, committed_vsn)); + bbq_fetch_max(&n_blk->reserved, bbq_set_cur_vsn(q, committed_vsn)); } else { if (committed_vsn != ch_vsn + 1) { // 消费者追上了生产者,下一块还未开始生产 return false; } - uint64_t new_vsn = set_cur_vsn(q, ch_vsn + 1); - fetch_max(&n_blk->consumed, new_vsn); - fetch_max(&n_blk->reserved, new_vsn); + uint64_t new_vsn = bbq_set_cur_vsn(q, ch_vsn + 1); + bbq_fetch_max(&n_blk->consumed, new_vsn); + bbq_fetch_max(&n_blk->reserved, new_vsn); } - fetch_max(&q->chead, ch + 1); + bbq_fetch_max(&q->chead, ch + 1); return true; } @@ -724,15 +755,16 @@ bool advance_chead(struct bbq *q, uint64_t ch, uint64_t ver) { static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n, uint32_t data_type, uint32_t *wait_consumed) { struct bbq_status ret = {.status = 0, .actual_burst = 0}; if (q == NULL || deq_data == NULL) { - ret.status = BBQ_NULL_PTR; + bbq_errno = BBQ_ERR_INPUT_NULL; + ret.status = bbq_errno; return ret; } while (true) { uint64_t ch = atomic_load(&q->chead); - struct bbq_block *blk = &(q->blocks[bbq_idx(q, ch)]); + struct bbq_block *blk = &(q->blocks[bbq_head_idx(q, ch)]); struct bbq_queue_state_s state; - state = reserve_entry(q, blk, n); + state = bbq_reserve_entry(q, blk, n); switch (state.state) { case BBQ_RESERVED: @@ -743,19 +775,23 @@ static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n ret.actual_burst = state.e.actual_burst; break; case BBQ_NO_ENTRY: - ret.status = BBQ_QUEUE_EMPTY; + bbq_errno = BBQ_ERR_EMPTY; + ret.status = bbq_errno; break; case BBQ_NOT_AVAILABLE: - ret.status = BBQ_QUEUE_BUSY; + bbq_errno = BBQ_ERR_BUSY; + ret.status = bbq_errno; break; case BBQ_BLOCK_DONE: if (advance_chead(q, ch, state.vsn)) { continue; } - ret.status = BBQ_QUEUE_EMPTY; + bbq_errno = BBQ_ERR_EMPTY; + ret.status = bbq_errno; break; default: - ret.status = BBQ_ERROR; + bbq_errno = BBQ_ERR; + ret.status = bbq_errno; break; } @@ -768,16 +804,18 @@ static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n } int bbq_dequeue(struct bbq *q, void **data) { + bbq_errno = BBQ_OK; struct bbq_status ret = __bbq_dequeue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL); return ret.status; } int bbq_dequeue_elem(struct bbq *q, void *data) { + bbq_errno = BBQ_OK; struct bbq_status ret = __bbq_dequeue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL); return ret.status; } -uint32_t bbq_max_burst(struct bbq *q, uint32_t n) { +static uint32_t bbq_max_burst(struct bbq *q, uint32_t n) { uint32_t burst = n; if (burst > q->bs) { burst = q->bs; @@ -788,11 +826,13 @@ uint32_t bbq_max_burst(struct bbq *q, uint32_t n) { static uint32_t bbq_dequeue_burst_one_dimensional(struct bbq *q, void *obj_table, uint32_t n, uint32_t *wait_consumed) { if (q == NULL || obj_table == NULL) { - return BBQ_NULL_PTR; + bbq_errno = BBQ_ERR_INPUT_NULL; + return 0; } - if (!BBQ_F_CHK_VALUE(q->flags)) { - return BBQ_QUEUE_DATA_ERR; + if (!BBQ_F_CHK_COPY_VALUE(q->flags)) { + bbq_errno = BBQ_ERR_NOT_SUPPORT; + return 0; } uint32_t burst = 0; @@ -815,7 +855,8 @@ static uint32_t bbq_dequeue_burst_one_dimensional(struct bbq *q, void *obj_table static uint32_t bbq_dequeue_burst_two_dimensional(struct bbq *q, void **obj_table, uint32_t n, uint32_t *wait_consumed) { if (q == NULL || obj_table == NULL) { - return BBQ_NULL_PTR; + bbq_errno = BBQ_ERR_INPUT_NULL; + return 0; } uint32_t burst = 0; @@ -837,13 +878,15 @@ static uint32_t bbq_dequeue_burst_two_dimensional(struct bbq *q, void **obj_tabl } /* 尝试一次入队多个数据,直到达到最大数量,或是入队失败 */ -uint32_t bbq_enqueue_burst_one_dimensional(struct bbq *q, void const *obj_table, uint32_t n, uint32_t *wait_consumed) { +static uint32_t bbq_enqueue_burst_one_dimensional(struct bbq *q, void const *obj_table, uint32_t n, uint32_t *wait_consumed) { if (q == NULL || obj_table == NULL) { - return BBQ_NULL_PTR; + bbq_errno = BBQ_ERR_INPUT_NULL; + return 0; } - if (!BBQ_F_CHK_VALUE(q->flags)) { - return BBQ_QUEUE_DATA_ERR; + if (!BBQ_F_CHK_COPY_VALUE(q->flags)) { + bbq_errno = BBQ_ERR_NOT_SUPPORT; + return 0; } uint32_t burst = 0; @@ -865,9 +908,10 @@ uint32_t bbq_enqueue_burst_one_dimensional(struct bbq *q, void const *obj_table, } /* 尝试一次入队多个数据,直到达到最大数量,或是入队失败 */ -uint32_t bbq_enqueue_burst_two_dimensional(struct bbq *q, void *const *obj_table, uint32_t n, uint32_t *wait_consumed) { +static uint32_t bbq_enqueue_burst_two_dimensional(struct bbq *q, void *const *obj_table, uint32_t n, uint32_t *wait_consumed) { if (q == NULL || obj_table == NULL) { - return BBQ_NULL_PTR; + bbq_errno = BBQ_ERR_INPUT_NULL; + return 0; } uint32_t burst = 0; @@ -894,8 +938,8 @@ bool bbq_empty(struct bbq *q) { uint64_t ph_vsn = bbq_head_vsn(q, phead); uint64_t ch_vsn = bbq_head_vsn(q, chead); - uint64_t ph_idx = bbq_idx(q, phead); - uint64_t ch_idx = bbq_idx(q, chead); + uint64_t ph_idx = bbq_head_idx(q, phead); + uint64_t ch_idx = bbq_head_idx(q, chead); struct bbq_block *block; @@ -905,13 +949,13 @@ bool bbq_empty(struct bbq *q) { block = &q->blocks[ph_idx]; if (ph_vsn == ch_vsn) { - if (bbq_off(q, atomic_load(&block->reserved)) == bbq_off(q, atomic_load(&block->committed))) { + if (bbq_cur_off(q, atomic_load(&block->reserved)) == bbq_cur_off(q, atomic_load(&block->committed))) { return true; } } bbq_cursor reserved = atomic_load(&block->reserved); - uint64_t reserved_off = bbq_off(q, reserved); + uint64_t reserved_off = bbq_cur_off(q, reserved); if (BBQ_F_CHK_DROP_OLD(q->flags) && ph_vsn > ch_vsn && @@ -925,22 +969,27 @@ bool bbq_empty(struct bbq *q) { } uint32_t bbq_enqueue_burst_elem(struct bbq *q, void const *obj_table, uint32_t n, uint32_t *wait_consumed) { + bbq_errno = BBQ_OK; return bbq_enqueue_burst_one_dimensional(q, obj_table, n, wait_consumed); } uint32_t bbq_enqueue_burst_elem_two_dimensional(struct bbq *q, void *const *obj_table, uint32_t n, uint32_t *wait_consumed) { + bbq_errno = BBQ_OK; return bbq_enqueue_burst_two_dimensional(q, obj_table, n, wait_consumed); } uint32_t bbq_enqueue_burst(struct bbq *q, void *const *obj_table, uint32_t n, uint32_t *wait_consumed) { + bbq_errno = BBQ_OK; return bbq_enqueue_burst_two_dimensional(q, obj_table, n, wait_consumed); } uint32_t bbq_dequeue_burst(struct bbq *q, void **obj_table, uint32_t n, uint32_t *wait_consumed) { + bbq_errno = BBQ_OK; return bbq_dequeue_burst_two_dimensional(q, obj_table, n, wait_consumed); } uint32_t bbq_dequeue_burst_elem(struct bbq *q, void *obj_table, uint32_t n, uint32_t *wait_consumed) { + bbq_errno = BBQ_OK; return bbq_dequeue_burst_one_dimensional(q, obj_table, n, wait_consumed); } @@ -1014,11 +1063,11 @@ void bbq_debug_block_print(struct bbq *q, struct bbq_block *block) { bbq_cursor reserved = atomic_load(&block->reserved); bbq_cursor consumed = atomic_load(&block->consumed); printf(" allocated:%lu committed:%lu reserved:%lu", - bbq_off(q, allocated), bbq_off(q, committed), bbq_off(q, reserved)); + bbq_cur_off(q, allocated), bbq_cur_off(q, committed), bbq_cur_off(q, reserved)); if (BBQ_F_CHK_DROP_OLD(q->flags)) { printf("\n"); } else { - printf(" consumed:%lu\n", bbq_off(q, consumed)); + printf(" consumed:%lu\n", bbq_cur_off(q, consumed)); } } @@ -1027,17 +1076,17 @@ void bbq_debug_struct_print(struct bbq *q) { uint64_t phead = atomic_load(&q->phead); uint64_t chead = atomic_load(&q->chead); - printf("block number:%lu block size:%lu total entries:%lu\n", q->bn, q->bs, q->bn * q->bs); - printf("producer header idx:%lu vsn:%lu\n", bbq_idx(q, phead), bbq_head_vsn(q, phead)); + printf("block number:%u block size:%u total entries:%u\n", q->bn, q->bs, q->bn * q->bs); + printf("producer header idx:%lu vsn:%lu\n", bbq_head_idx(q, phead), bbq_head_vsn(q, phead)); - uint64_t ph_idx = bbq_idx(q, phead); - uint64_t ch_idx = bbq_idx(q, chead); + uint64_t ph_idx = bbq_head_idx(q, phead); + uint64_t ch_idx = bbq_head_idx(q, chead); if (ph_idx != ch_idx) { printf("block[%lu]\n", ph_idx); bbq_debug_block_print(q, &(q->blocks[ph_idx])); } - printf("consumer header idx:%lu vsn:%lu\n", bbq_idx(q, chead), bbq_head_vsn(q, chead)); + printf("consumer header idx:%lu vsn:%lu\n", bbq_head_idx(q, chead), bbq_head_vsn(q, chead)); printf("block[%lu]\n", ch_idx); bbq_debug_block_print(q, &(q->blocks[ch_idx])); }
\ No newline at end of file diff --git a/bbq/tests/common/test_mix.c b/bbq/tests/common/test_mix.c index bcadaa5..28468f1 100644 --- a/bbq/tests/common/test_mix.c +++ b/bbq/tests/common/test_mix.c @@ -162,7 +162,7 @@ int test_setaffinity(int core_id) { if (pthread_setaffinity_np(pthread_self(), sizeof(mask), &mask) == -1) { fprintf(stderr, "pthread_setaffinity_np erro\n"); - return BBQ_ERROR; + return BBQ_ERR; } return BBQ_OK; diff --git a/bbq/tests/common/test_queue.c b/bbq/tests/common/test_queue.c index f4726d9..0934285 100644 --- a/bbq/tests/common/test_queue.c +++ b/bbq/tests/common/test_queue.c @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-24 10:16:28 + * @LastEditTime: 2024-06-27 02:50:17 * @Email: [email protected] * @Describe: TODO */ @@ -11,8 +11,8 @@ #include <sys/prctl.h> #include <unistd.h> extern bool bbq_debug_check_array_bounds(struct bbq *q); -extern struct bbq *bbq_create_bnbs_elem(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags); -extern struct bbq *bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs, int socket_id, uint32_t flags); +extern struct bbq *bbq_create_elem_with_bnbs(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags); +extern struct bbq *bbq_create_with_bnbs(const char *name, uint32_t bn, uint32_t bs, int socket_id, uint32_t flags); uint32_t test_bbq_enqueue_burst(void *ring, void **obj_table, uint32_t n, uint16_t thread_idx, uint32_t *wait_consumed) { TEST_AVOID_WARNING(thread_idx); @@ -23,14 +23,14 @@ int test_queue_init_bbq(test_cfg *cfg, test_queue_s *q) { if (cfg->ring.block_count == 0) { q->ring = bbq_create("test_bbq", cfg->ring.entries_cnt, BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); } else { - q->ring = bbq_create_bnbs("test_bbq", cfg->ring.block_count, - cfg->ring.entries_cnt / cfg->ring.block_count, - BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); + q->ring = bbq_create_with_bnbs("test_bbq", cfg->ring.block_count, + cfg->ring.entries_cnt / cfg->ring.block_count, + BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); } if (q->ring == NULL) { TEST_ERR_LOG("bbq create queue failed"); - return BBQ_NULL_PTR; + return BBQ_ERR_INPUT_NULL; } q->ring_free_f = (test_ring_free_f)bbq_destory; @@ -104,10 +104,17 @@ void test_data_destory(test_data **data, size_t cnt) { test_free(TEST_MODULE_DATA, data); } -uint32_t test_exec_enqueue(test_queue_s *q, test_data **data, size_t burst_cnt, test_time_metric *op_use_diff, uint16_t thread_idx) { +uint32_t test_exec_enqueue(test_queue_s *q, test_data **data, size_t burst_cnt, + test_time_metric *op_use_diff, uint16_t thread_idx) { uint32_t enqueue_cnt = 0; test_time_metric op_use_start = test_clock_time_get(); +#if 0 + // wait_consumed会导致bbq损失部分性能 + uint32_t wait_consumed = 0; + enqueue_cnt = q->enqueue_burst_f(q->ring, (void **)data, burst_cnt, thread_idx, &wait_consumed); +#else enqueue_cnt = q->enqueue_burst_f(q->ring, (void **)data, burst_cnt, thread_idx, NULL); +#endif *op_use_diff = test_clock_time_sub(test_clock_time_get(), op_use_start); return enqueue_cnt; @@ -138,7 +145,6 @@ void *test_thread_producer_start(void *arg) { uint64_t op_err_latency_ns = 0; uint64_t run_ok_times = cfg->run.run_ok_times / cfg->ring.producer_cnt; test_time_metric op_latency = {0}; - snprintf(thread_name, sizeof(thread_name), "producer:%lu", exit_data->thread_id); prctl(PR_SET_NAME, thread_name); if (test_setaffinity(t_arg->core) != BBQ_OK) { @@ -222,13 +228,10 @@ void *test_thread_consumer_start(void *arg) { exit_data->metric_start = test_clock_time_get(); - uint32_t last_times = cfg->ring.entries_cnt; while (true) { - if (!test_info->ctl.running || test_all_producer_exit(test_info)) { + if (test_all_producer_exit(test_info) && deq_cnt == 0) { // 运行时间到了或是所有生产者退出了,检查生产者是否全部退出,且队列被消费完了 - if (deq_cnt == 0 && (last_times-- <= 0)) { - break; - } + break; } deq_cnt = test_exec_dequeue(q, deq_data, cfg->ring.burst_cnt, &op_latency); diff --git a/bbq/tests/common/test_queue.h b/bbq/tests/common/test_queue.h index 8d40e92..9edefdb 100644 --- a/bbq/tests/common/test_queue.h +++ b/bbq/tests/common/test_queue.h @@ -91,8 +91,8 @@ extern void test_wait_all_threads_ready(test_ctl *ctl); extern void test_queue_destory(test_queue_s *q); extern int test_queue_init_bbq(test_cfg *cfg, test_queue_s *q); extern void test_merge_all_data(test_exit_data **exit_data, uint32_t thread_cnt, test_merge_s *merge); -extern uint64_t bbq_idx(struct bbq *q, uint64_t x); -extern uint64_t bbq_off(struct bbq *q, uint64_t x); +extern uint64_t bbq_head_idx(struct bbq *q, uint64_t x); +extern uint64_t bbq_cur_off(struct bbq *q, uint64_t x); extern uint64_t bbq_head_vsn(struct bbq *q, uint64_t x); extern uint64_t bbq_cur_vsn(struct bbq *q, uint64_t x); extern test_data **test_data_create(size_t cnt); diff --git a/bbq/tests/unittest/ut_example.cc b/bbq/tests/unittest/ut_example.cc index 64ee3f6..0439612 100644 --- a/bbq/tests/unittest/ut_example.cc +++ b/bbq/tests/unittest/ut_example.cc @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-24 10:13:21 + * @LastEditTime: 2024-06-25 11:40:27 * @Email: [email protected] * @Describe: 简单的测试用例,测试基本功能 */ @@ -11,7 +11,6 @@ extern "C" { #include "test_queue.h" #include "ut.h" extern bool bbq_malloc_free_equal(); -extern void bbq_debug_memory_print(); extern bool bbq_debug_check_array_bounds(struct bbq *q); extern void bbq_struct_print(struct bbq *q); extern uint32_t bbq_enqueue_burst_elem_two_dimensional(struct bbq *q, void *const *obj_table, uint32_t n, uint32_t *wait_consumed); @@ -57,7 +56,7 @@ TEST_F(bbq_example, single_retry_new_cp_ptr) { ASSERT_NE(q, nullptr); // 空队出队失败 - EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_QUEUE_EMPTY); + EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_ERR_EMPTY); // 全部入队成功 for (uint32_t i = 0; i < 4000; i++) { @@ -88,7 +87,8 @@ TEST_F(bbq_example, single_retry_new_cp_ptr) { EXPECT_EQ(cnt, BUF_CNT); // 空队出队失败 - EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_QUEUE_EMPTY); + EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_ERR_EMPTY); + bbq_destory(q); } TEST_F(bbq_example, single_retry_new_cp_value) { @@ -101,7 +101,7 @@ TEST_F(bbq_example, single_retry_new_cp_value) { ASSERT_NE(q, nullptr); // 空队出队失败 - EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_QUEUE_EMPTY); + EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_ERR_EMPTY); // 全部入队成功 for (uint32_t i = 0; i < 4000; i++) { @@ -132,7 +132,8 @@ TEST_F(bbq_example, single_retry_new_cp_value) { EXPECT_EQ(cnt, BUF_CNT); // 空队出队失败 - EXPECT_EQ(bbq_dequeue_elem(q, &deq_data), BBQ_QUEUE_EMPTY); + EXPECT_EQ(bbq_dequeue_elem(q, &deq_data), BBQ_ERR_EMPTY); + bbq_destory(q); } TEST_F(bbq_example, single_drop_old_cp_pointer) { @@ -148,7 +149,7 @@ TEST_F(bbq_example, single_drop_old_cp_pointer) { EXPECT_LT(second_cnt, q->bs * q->bn); // 空队出队失败 - EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_QUEUE_EMPTY); + EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_ERR_EMPTY); // 全部入队成功,入队个数是BUF_CNT的整数倍,因此到了一个边界,刚好与消费者位置一致(套了loop圈) uint32_t loop = 3; @@ -184,7 +185,9 @@ TEST_F(bbq_example, single_drop_old_cp_pointer) { EXPECT_EQ(cnt, second_cnt - q->bs); // 空队出队失败 - EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_QUEUE_EMPTY); + EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_ERR_EMPTY); + + bbq_destory(q); } TEST_F(bbq_example, single_drop_old_cp_value) { @@ -200,7 +203,7 @@ TEST_F(bbq_example, single_drop_old_cp_value) { EXPECT_LT(second_cnt, q->bs * q->bn); // 空队出队失败 - EXPECT_EQ(bbq_dequeue_elem(q, &deq_data), BBQ_QUEUE_EMPTY); + EXPECT_EQ(bbq_dequeue_elem(q, &deq_data), BBQ_ERR_EMPTY); // 全部入队成功,入队个数是BUF_CNT的整数倍,因此到了一个边界,刚好与消费者位置一致(套了loop圈) uint32_t loop = 3; @@ -234,9 +237,10 @@ TEST_F(bbq_example, single_drop_old_cp_value) { // 一旦生产者追上了消费者,之前未消费的,以及当前块的数据全都作废了。 EXPECT_EQ(cnt, second_cnt - q->bs); - // 空队出队失败 - EXPECT_EQ(bbq_dequeue_elem(q, &deq_data), BBQ_QUEUE_EMPTY); + EXPECT_EQ(bbq_dequeue_elem(q, &deq_data), BBQ_ERR_EMPTY); + + bbq_destory(q); } TEST_F(bbq_example, burst_retry_new_cp_value) { @@ -285,8 +289,8 @@ TEST_F(bbq_example, burst_retry_new_cp_value) { } EXPECT_TRUE(bbq_debug_check_array_bounds(q)); - bbq_destory(q); test_free(TEST_MODULE_DATA, deq_table2); + bbq_destory(q); } TEST_F(bbq_example, burst_retry_new_cp_pointer) { @@ -334,8 +338,8 @@ TEST_F(bbq_example, burst_retry_new_cp_pointer) { } EXPECT_TRUE(bbq_debug_check_array_bounds(q)); - bbq_destory(q); test_free(TEST_MODULE_DATA, deq_table2); + bbq_destory(q); } TEST_F(bbq_example, burst_drop_old_cp_pointer) { @@ -384,8 +388,8 @@ TEST_F(bbq_example, burst_drop_old_cp_pointer) { } EXPECT_TRUE(bbq_debug_check_array_bounds(q)); - bbq_destory(q); test_free(TEST_MODULE_DATA, deq_table2); + bbq_destory(q); } TEST_F(bbq_example, burst_drop_old_cp_value) { diff --git a/bbq/tests/unittest/ut_head_cursor.cc b/bbq/tests/unittest/ut_head_cursor.cc index b8bff37..e3b3b50 100644 --- a/bbq/tests/unittest/ut_head_cursor.cc +++ b/bbq/tests/unittest/ut_head_cursor.cc @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-24 10:15:57 + * @LastEditTime: 2024-06-25 11:42:49 * @Email: [email protected] * @Describe: TODO */ @@ -9,9 +9,8 @@ extern "C" { #include "test_queue.h" #include "ut.h" extern bool bbq_malloc_free_equal(); -extern void bbq_debug_memory_print(); extern bool bbq_debug_check_array_bounds(struct bbq *q); -extern struct bbq *bbq_create_bnbs_elem(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags); +extern struct bbq *bbq_create_elem_with_bnbs(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags); } class bbq_head_cursor : public testing::Test { // 继承了 testing::Test @@ -31,32 +30,32 @@ class bbq_head_cursor : public testing::Test { // 继承了 testing::Test }; void expect_phead(struct bbq *q, uint64_t idx, uint64_t vsn, int line) { - EXPECT_EQ(bbq_idx(q, q->phead), idx) << "line: " << line; + EXPECT_EQ(bbq_head_idx(q, q->phead), idx) << "line: " << line; EXPECT_EQ(bbq_head_vsn(q, q->phead), vsn) << "line: " << line; } void expect_chead(struct bbq *q, uint64_t idx, uint64_t vsn, int line) { - EXPECT_EQ(bbq_idx(q, q->chead), idx) << "line: " << line; + EXPECT_EQ(bbq_head_idx(q, q->chead), idx) << "line: " << line; EXPECT_EQ(bbq_head_vsn(q, q->chead), vsn) << "line: " << line; } void expect_eq_allocated(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) { - EXPECT_EQ(bbq_off(q, block->allocated), off) << "line: " << line; + EXPECT_EQ(bbq_cur_off(q, block->allocated), off) << "line: " << line; EXPECT_EQ(bbq_cur_vsn(q, block->allocated), vsn) << "line: " << line; } void expect_eq_committed(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) { - EXPECT_EQ(bbq_off(q, block->committed), off) << "line: " << line; + EXPECT_EQ(bbq_cur_off(q, block->committed), off) << "line: " << line; EXPECT_EQ(bbq_cur_vsn(q, block->committed), vsn) << "line: " << line; } void expect_eq_consumed(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) { - EXPECT_EQ(bbq_off(q, block->consumed), off) << "line: " << line; + EXPECT_EQ(bbq_cur_off(q, block->consumed), off) << "line: " << line; EXPECT_EQ(bbq_cur_vsn(q, block->consumed), vsn) << "line: " << line; } void expect_eq_reserved(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) { - EXPECT_EQ(bbq_off(q, block->reserved), off) << "line: " << line; + EXPECT_EQ(bbq_cur_off(q, block->reserved), off) << "line: " << line; EXPECT_EQ(bbq_cur_vsn(q, block->reserved), vsn) << "line: " << line; } @@ -65,7 +64,7 @@ TEST_F(bbq_head_cursor, init) { struct bbq *q; uint32_t bn = 2; uint32_t bs = 4; - q = bbq_create_bnbs_elem("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); + q = bbq_create_elem_with_bnbs("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); ASSERT_NE(q, nullptr); // 1.初始化状态,除了第一个block外其他块的4个游标都指向最后一个条目 @@ -84,8 +83,6 @@ TEST_F(bbq_head_cursor, init) { } EXPECT_TRUE(bbq_debug_check_array_bounds(q)); bbq_destory(q); - EXPECT_TRUE(bbq_malloc_free_equal()); - EXPECT_TRUE(test_malloc_free_equal()); } void ut_produce_something(uint32_t produce_cnt) { @@ -99,7 +96,7 @@ void ut_produce_something(uint32_t produce_cnt) { EXPECT_GT(produce_cnt, 0); EXPECT_LE(produce_cnt, bs); - q = bbq_create_bnbs_elem("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); + q = bbq_create_elem_with_bnbs("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); ASSERT_NE(q, nullptr); // 生产produce_cnt @@ -144,8 +141,6 @@ TEST_F(bbq_head_cursor, produce_something) { ut_produce_something(567); ut_produce_something(789); ut_produce_something(4096); - EXPECT_TRUE(bbq_malloc_free_equal()); - EXPECT_TRUE(test_malloc_free_equal()); } void ut_produce_next_block(uint32_t over) { @@ -160,7 +155,7 @@ void ut_produce_next_block(uint32_t over) { EXPECT_GT(over, 0); EXPECT_LT(over, bs); - q = bbq_create_bnbs_elem("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); + q = bbq_create_elem_with_bnbs("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); ASSERT_NE(q, nullptr); // 生产至第二块的第一个entry @@ -210,8 +205,6 @@ TEST_F(bbq_head_cursor, produce_next_block) { ut_produce_next_block(123); ut_produce_next_block(456); ut_produce_next_block(4095); - EXPECT_TRUE(bbq_malloc_free_equal()); - EXPECT_TRUE(test_malloc_free_equal()); } void ut_produce_all_loop(uint32_t loop) { @@ -223,7 +216,7 @@ void ut_produce_all_loop(uint32_t loop) { int enqueue_data = TEST_DATA_MAGIC; int dequeue_data = 0; - q = bbq_create_bnbs_elem("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); + q = bbq_create_elem_with_bnbs("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); ASSERT_NE(q, nullptr); for (uint32_t cnt = 0; cnt < loop; cnt++) { @@ -266,8 +259,6 @@ TEST_F(bbq_head_cursor, produce_all_loop) { ut_produce_all_loop(10); ut_produce_all_loop(23); ut_produce_all_loop(79); - EXPECT_TRUE(bbq_malloc_free_equal()); - EXPECT_TRUE(test_malloc_free_equal()); } TEST_F(bbq_head_cursor, retry_new_full_empty) { @@ -296,7 +287,7 @@ TEST_F(bbq_head_cursor, retry_new_full_empty) { // 满队再入队 for (uint32_t j = 0; j < entries_cnt / 3; j++) { ret = bbq_enqueue_elem(q, &data[j]); - EXPECT_TRUE(ret == BBQ_QUEUE_FULL); + EXPECT_TRUE(ret == BBQ_ERR_FULL); } if (i == 0) { @@ -321,7 +312,7 @@ TEST_F(bbq_head_cursor, retry_new_full_empty) { // 空出队再出队 for (uint32_t j = 0; j < entries_cnt / 2; j++) { ret = bbq_dequeue_elem(q, &tmp_data); - EXPECT_TRUE(ret == BBQ_QUEUE_EMPTY); + EXPECT_TRUE(ret == BBQ_ERR_EMPTY); } EXPECT_EQ(q->phead.load() & q->idx_mask, q->chead.load() & q->idx_mask); @@ -336,8 +327,6 @@ TEST_F(bbq_head_cursor, retry_new_full_empty) { test_free(TEST_MODULE_UTEST, data); EXPECT_TRUE(bbq_debug_check_array_bounds(q)); bbq_destory(q); - EXPECT_TRUE(bbq_malloc_free_equal()); - EXPECT_TRUE(test_malloc_free_equal()); } TEST_F(bbq_head_cursor, mpsc_faa) { @@ -402,8 +391,6 @@ TEST_F(bbq_head_cursor, mpsc_faa) { test_threads_destory(&test_info, threads); EXPECT_TRUE(bbq_debug_check_array_bounds((struct bbq *)q.ring)); test_queue_destory(&q); - EXPECT_TRUE(bbq_malloc_free_equal()); - EXPECT_TRUE(test_malloc_free_equal()); } TEST_F(bbq_head_cursor, drop_old_full_empty) { @@ -414,7 +401,7 @@ TEST_F(bbq_head_cursor, drop_old_full_empty) { struct bbq *q; int tmp_data = 0; - q = bbq_create_bnbs_elem("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD); + q = bbq_create_elem_with_bnbs("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD); ASSERT_NE(q, nullptr); EXPECT_TRUE(bbq_empty(q)); @@ -438,7 +425,7 @@ TEST_F(bbq_head_cursor, drop_old_full_empty) { // 空队再出队,失败 for (uint32_t i = 0; i < bn * bs; i++) { ret = bbq_dequeue_elem(q, &tmp_data); - EXPECT_TRUE(ret == BBQ_QUEUE_EMPTY) << "ret " << ret; + EXPECT_TRUE(ret == BBQ_ERR_EMPTY) << "ret " << ret; } expect_phead(q, bn - 1, j, __LINE__); @@ -452,9 +439,6 @@ TEST_F(bbq_head_cursor, drop_old_full_empty) { } EXPECT_TRUE(bbq_debug_check_array_bounds(q)); bbq_destory(q); - - EXPECT_TRUE(bbq_malloc_free_equal()); - EXPECT_TRUE(test_malloc_free_equal()); } TEST_F(bbq_head_cursor, drop_old_full_empty_cover) { @@ -468,7 +452,7 @@ TEST_F(bbq_head_cursor, drop_old_full_empty_cover) { EXPECT_EQ(over_cnt / bs, 1); int tmp_data = 0; - q = bbq_create_bnbs_elem("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD); + q = bbq_create_elem_with_bnbs("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD); ASSERT_NE(q, nullptr); EXPECT_TRUE(bbq_empty(q)); @@ -514,7 +498,7 @@ TEST_F(bbq_head_cursor, drop_old_full_empty_cover) { for (uint32_t i = 0; i < bn * bs; i++) { EXPECT_TRUE(bbq_empty(q)); ret = bbq_dequeue_elem(q, &tmp_data); - EXPECT_TRUE(ret == BBQ_QUEUE_EMPTY) << "ret " << ret; + EXPECT_TRUE(ret == BBQ_ERR_EMPTY) << "ret " << ret; } expect_chead(q, 1, 0, __LINE__); @@ -535,6 +519,4 @@ TEST_F(bbq_head_cursor, drop_old_full_empty_cover) { EXPECT_TRUE(bbq_debug_check_array_bounds(q)); bbq_destory(q); - EXPECT_TRUE(bbq_malloc_free_equal()); - EXPECT_TRUE(test_malloc_free_equal()); }
\ No newline at end of file diff --git a/bbq/tests/unittest/ut_mix.cc b/bbq/tests/unittest/ut_mix.cc index daf6c97..fa2f07f 100644 --- a/bbq/tests/unittest/ut_mix.cc +++ b/bbq/tests/unittest/ut_mix.cc @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-19 22:49:24 + * @LastEditTime: 2024-06-26 03:37:56 * @Email: [email protected] * @Describe: bbq除了队列操作外,其他函数的测试 */ @@ -10,11 +10,11 @@ extern "C" { #include "ut.h" #include <math.h> extern bool bbq_check_power_of_two(int n); -extern uint32_t bbq_blocks_calc(uint32_t entries); -extern unsigned ceil_log2(uint64_t x); -extern uint64_t fetch_max(aotmic_uint64 *atom, uint64_t upd); +extern unsigned bbq_ceil_log2(uint64_t x); +extern uint64_t bbq_fetch_max(aotmic_uint64 *atom, uint64_t upd); extern bool bbq_malloc_free_equal(); extern bool test_malloc_free_equal(); +extern int bbq_bnbs_calc(uint32_t entries, uint32_t *bn, uint32_t *bs); } class bbq_mix : public testing::Test { // 继承了 testing::Test @@ -48,17 +48,17 @@ void *fetch_max_thread_func(void *arg) { uint64_t *ret = (uint64_t *)test_malloc(TEST_MODULE_UTEST, sizeof(*ret)); // 不同线程写入不同的>3的数 - *ret = fetch_max(&fetch_arg->data, pthread_self() + 3); + *ret = bbq_fetch_max(&fetch_arg->data, pthread_self() + 3); pthread_exit(ret); } -TEST_F(bbq_mix, fetch_max) { +TEST_F(bbq_mix, bbq_fetch_max) { uint64_t ret = 0; ut_fetch_arg arg = {}; arg.data.store(1); // 初始化1 arg.thread_cnt = 50; - ret = fetch_max(&arg.data, 2); // max比较后设置为2 + ret = bbq_fetch_max(&arg.data, 2); // max比较后设置为2 EXPECT_EQ(arg.data.load(), 2); EXPECT_EQ(ret, 1); @@ -80,8 +80,6 @@ TEST_F(bbq_mix, fetch_max) { // EXPECT_EQ(eq_cnt, 1); test_free(TEST_MODULE_UTEST, threads); - EXPECT_TRUE(bbq_malloc_free_equal()); - EXPECT_TRUE(test_malloc_free_equal()); } TEST_F(bbq_mix, power_of_two) { @@ -105,30 +103,34 @@ TEST_F(bbq_mix, power_of_two) { break; } } - EXPECT_TRUE(bbq_malloc_free_equal()); - EXPECT_TRUE(test_malloc_free_equal()); } -TEST_F(bbq_mix, bbq_blocks_calc) { - uint32_t tmp = 0; +TEST_F(bbq_mix, bbq_block_number_calc) { + uint32_t tmp = 2; uint32_t max = pow(2, 32) - 1; + uint32_t bn = 0, bs = 0; + int ret = 0; - tmp = 2; - for (uint32_t val = 1; val < max; val *= tmp) { + ret = bbq_bnbs_calc(1, &bn, &bs); + EXPECT_EQ(ret, BBQ_ERR_OUT_OF_RANGE); + + for (uint32_t val = 2; val < max; val *= tmp) { + ret = bbq_bnbs_calc(val, &bn, &bs); + EXPECT_EQ(ret, BBQ_OK); if (val <= 128) { - EXPECT_TRUE(bbq_blocks_calc(val) == 2); + EXPECT_EQ(bn, 2); } else if (val <= 2048) { - EXPECT_TRUE(bbq_blocks_calc(val) == 4); + EXPECT_EQ(bn, 4); } else if (val <= 32768) { - EXPECT_TRUE(bbq_blocks_calc(val) == 8); + EXPECT_EQ(bn, 8); } else if (val <= 524288) { - EXPECT_TRUE(bbq_blocks_calc(val) == 16); + EXPECT_EQ(bn, 16); } else if (val <= 8388608) { - EXPECT_TRUE(bbq_blocks_calc(val) == 32); + EXPECT_EQ(bn, 32); } else if (val <= 134217728) { - EXPECT_TRUE(bbq_blocks_calc(val) == 64); + EXPECT_EQ(bn, 64); } else if (val <= 2147483648) { - EXPECT_TRUE(bbq_blocks_calc(val) == 128); + EXPECT_EQ(bn, 128); } else { EXPECT_TRUE(0); // 异常 } @@ -137,22 +139,4 @@ TEST_F(bbq_mix, bbq_blocks_calc) { break; } } - EXPECT_TRUE(bbq_malloc_free_equal()); - EXPECT_TRUE(test_malloc_free_equal()); -} - -TEST_F(bbq_mix, ceil_log2) { - uint32_t tmp = 0; - uint32_t max = pow(2, 32) - 1; - - tmp = 2; - for (uint32_t val = 1, bbq_idx = 0; val < max; val *= tmp, bbq_idx++) { - EXPECT_TRUE(bbq_idx == ceil_log2(val)); - if (val >= max / tmp) { - break; - } - EXPECT_TRUE(log2(val) == ceil_log2(val)); - } - EXPECT_TRUE(bbq_malloc_free_equal()); - EXPECT_TRUE(test_malloc_free_equal()); } diff --git a/bbq/tests/unittest/ut_multit.cc b/bbq/tests/unittest/ut_multit.cc index 0d29b1c..bbe963b 100644 --- a/bbq/tests/unittest/ut_multit.cc +++ b/bbq/tests/unittest/ut_multit.cc @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-17 11:07:02 + * @LastEditTime: 2024-06-25 11:30:59 * @Email: [email protected] * @Describe: TODO */ @@ -12,7 +12,6 @@ extern "C" { #include "ut.h" extern bool bbq_malloc_free_equal(); extern bool test_malloc_free_equal(); -extern void bbq_debug_memory_print(); bool bbq_debug_check_array_bounds(struct bbq *q); } diff --git a/perf/benchmark/bcm_benchmark.c b/perf/benchmark/bcm_benchmark.c index 26d75de..801cd3a 100644 --- a/perf/benchmark/bcm_benchmark.c +++ b/perf/benchmark/bcm_benchmark.c @@ -39,12 +39,12 @@ void bcm_report_printf(test_cfg *cfg, test_merge_data *data, test_exit_data **ra // 多生产者单消费者 或 单生产者多消费才输出 if ((cfg->ring.producer_cnt == 1 && cfg->ring.consumer_cnt > 1) || (cfg->ring.producer_cnt > 1 && cfg->ring.consumer_cnt == 1)) { - for (uint32_t i = 0, bbq_idx = 1; i < thread_cnt; i++) { + for (uint32_t i = 0, bbq_head_idx = 1; i < thread_cnt; i++) { if (raw_data[i]->arg->ttype == ttype) { test_time_metric tmp_time = test_clock_time_sub(raw_data[i]->metric_end, raw_data[i]->metric_start); throughput = raw_data[i]->ok_cnt / test_clock_time_to_double(&tmp_time); - printf(" %s-%d 吞吐 :%.0lf/s (%e/s)", name, bbq_idx, throughput, throughput); - bbq_idx++; + printf(" %s-%d 吞吐 :%.0lf/s (%e/s)", name, bbq_head_idx, throughput, throughput); + bbq_head_idx++; } } } diff --git a/perf/benchmark/bcm_queue.c b/perf/benchmark/bcm_queue.c index f6bce9d..5cbcea7 100644 --- a/perf/benchmark/bcm_queue.c +++ b/perf/benchmark/bcm_queue.c @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-21 18:07:20 + * @LastEditTime: 2024-06-25 14:14:40 * @Email: [email protected] * @Describe: TODO */ @@ -42,7 +42,7 @@ int test_queue_init_dpdk(test_cfg *cfg, test_queue_s *q) { q->ring = (void *)rte_ring_create("dpdk_ring", cfg->ring.entries_cnt, rte_socket_id(), flags); if (q->ring == NULL) { - return BBQ_NULL_PTR; + return BBQ_ERR_INPUT_NULL; } q->ring_free_f = (test_ring_free_f)rte_ring_free; @@ -159,7 +159,7 @@ int test_queue_init_rmind(test_cfg *cfg, test_queue_s *q) { int bcm_queue_init(test_cfg *cfg, test_queue_s *q) { if (cfg == NULL || q == NULL) { - return BBQ_NULL_PTR; + return BBQ_ERR_INPUT_NULL; } memset(q, 0, sizeof(*q)); @@ -176,7 +176,7 @@ int bcm_queue_init(test_cfg *cfg, test_queue_s *q) { ret = test_queue_init_rmind(cfg, q); break; default: - return BBQ_UNKNOWN_TYPE; + return BBQ_ERR; } return ret; |
