summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorliuyu <[email protected]>2024-06-27 05:13:48 -0400
committerliuyu <[email protected]>2024-06-27 05:13:48 -0400
commit7f309a3257c04abbf20e5467d081da96413e4d21 (patch)
tree371d7a4997b6a9be0c11ddafa8adaa49e931607f
parentcbaa973c4acad87f03d128bdbfe094f2e578f1f7 (diff)
编码风格、注释、接口调整、性能调整
-rw-r--r--bbq/CMakeLists.txt4
-rw-r--r--bbq/include/bbq.h170
-rw-r--r--bbq/src/bbq.c359
-rw-r--r--bbq/tests/common/test_mix.c2
-rw-r--r--bbq/tests/common/test_queue.c31
-rw-r--r--bbq/tests/common/test_queue.h4
-rw-r--r--bbq/tests/unittest/ut_example.cc32
-rw-r--r--bbq/tests/unittest/ut_head_cursor.cc54
-rw-r--r--bbq/tests/unittest/ut_mix.cc64
-rw-r--r--bbq/tests/unittest/ut_multit.cc3
-rw-r--r--perf/benchmark/bcm_benchmark.c6
-rw-r--r--perf/benchmark/bcm_queue.c8
12 files changed, 402 insertions, 335 deletions
diff --git a/bbq/CMakeLists.txt b/bbq/CMakeLists.txt
index 77020a8..1242a45 100644
--- a/bbq/CMakeLists.txt
+++ b/bbq/CMakeLists.txt
@@ -19,6 +19,10 @@ endif()
add_compile_options(-Wall -Wextra)
+if(CMAKE_BUILD_TYPE STREQUAL "Debug")
+ add_definitions(-DBBQ_MEMORY)
+endif()
+
# 库生成的路径
set(LIB_PATH ${OUTPUT_DIR}/lib)
# 测试程序生成的路径
diff --git a/bbq/include/bbq.h b/bbq/include/bbq.h
index 56785fc..3b8269d 100644
--- a/bbq/include/bbq.h
+++ b/bbq/include/bbq.h
@@ -1,6 +1,6 @@
/*
* @Author: [email protected]
- * @LastEditTime: 2024-06-24 10:17:39
+ * @LastEditTime: 2024-06-27 03:04:19
* @Describe: bbq(Block-based Bounded Queue)头文件
* 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf
*/
@@ -33,52 +33,57 @@ struct bbq_block {
bbq_cursor allocated; // 已分配(version|offset)
bbq_cursor reserved; // 已预留(version|offset)
bbq_cursor consumed; // 已消费(version|offset)注:在drop-old模式下没用到
- char *entries; // 存储大小可变的entry,分配空间大小:bs * entry_size
+ char *entries; // 存储大小可变的entry,每个块分配空间:bs * entry_size
} __BBQ_CACHE_ALIGNED;
struct bbq {
char name[BBQ_SYMBOL_MAX] __BBQ_CACHE_ALIGNED;
- int32_t socket_id; // 用于libnuma分配内存,socket_id小于0将使用malloc分配
- uint32_t bn; // blocks的个数
- uint32_t bs; // blocks.entries的个数
- uint32_t flags; // 标记:retry new 模式,还是drop old模式
- uint32_t idx_bits; // bbq_head里idx所占的位数
- uint32_t off_bits; // bbq_cursor里offset所占的位数
- uint64_t idx_mask; // idx_bits偏移后的掩码
- uint64_t off_mask; // off_bits偏移后的掩码
- uint64_t entry_size; // blocks.entries里每个entry的大小
- bbq_head phead; // 生产者头,指向块的索引,分为两部分:version|idx
- bbq_head chead; // 消费者头,指向块的索引,分为两部分:version|idx
- struct bbq_block *blocks; // bn大小的数组
-};
-#define BBQ_F_DROP_OLD 0x0002 /**< 创建队列时设置为drop old模式(队列满时,入队成功并覆盖旧数据) */
-// #define BBQ_F_SP_ENQ 0x0004 /**< 创建队列时设置为单生产者 */
-// #define BBQ_F_SC_DEQ 0x0008 /**< 创建队列时设置为单消费者 */
+ int32_t socket_id; // 用于libnuma分配内存,socket_id小于0将使用malloc分配
+ uint32_t bn; // blocks的个数
+ uint32_t bs; // blocks.entries的个数
+ uint32_t flags; // 标记:retry new 模式,还是drop old模式
+ uint32_t idx_bits; // bbq_head里idx所占的位数
+ uint32_t off_bits; // bbq_cursor里offset所占的位数
+ uint64_t idx_mask; // idx_bits偏移后的掩码
+ uint64_t off_mask; // off_bits偏移后的掩码
+ uint64_t entry_size; // blocks.entries里每个entry的大小
+ bbq_head phead; // 生产者头,指向块的索引,分为两部分:version|idx
+ bbq_head chead; // 消费者头,指向块的索引,分为两部分:version|idx
+
+ struct bbq_block *blocks; // bn大小的数组
+} __BBQ_CACHE_ALIGNED;
#define BBQ_F_DEFAULT 0x0
+
+#define BBQ_F_DROP_OLD 0x0002 /**< 创建队列时设置为drop old模式(队列满时,入队成功并覆盖旧数据) */
#define BBQ_F_RETRY_NEW BBQ_F_DEFAULT /**< 创建队列时设置为retry new模式(队列满时,当前入队失败) */
-#define BBQ_F_MP_ENQ BBQ_F_DEFAULT /**< 创建队列时设置为多生产者 */
-#define BBQ_F_MC_DEQ BBQ_F_DEFAULT /**< 创建队列时设置为多消费者 */
/**
* 创建bbq队列,使用当前函数创建的队列,后续操作会把指针入队。
* 对应入队函数:bbq_enqueue、bbq_enqueue_burst
* 对应出队函数:bbq_dequeue、bbq_dequeue_burst
*
* @param[in] name
- * bbq名称
+ * 队列名称
* @param[in] count
- * 队列所有entry的个数,count必须大于1,且是2的N次方。
+ * 队列大小,参数必须大于1,且是2的N次方。
* @param[in] socket_id
* 多numa架构下,调用libnuma库函数针对指定socket分配内存。
* 当检测到不支持多numa,将转为malloc分配内存。
* @param[in] flags
* 设置入队策略:
- * 1)BBQ_F_RETRY_NEW(默认):队列满了当前入队失败。
- * 2)BBQ_F_DROP_OLD:队列满时,覆盖旧数据,入队成功
+ * - BBQ_F_RETRY_NEW(默认):队列满了当前入队失败。
+ * - BBQ_F_DROP_OLD:队列满时,覆盖旧数据,入队成功
+ * 设置生产者消费者模式:
+ * - BBQ_F_SP_ENQ:单生产者 BBQ_F_MP_ENQ:多生产者(默认)
+ * - BBQ_F_SC_DEQ:单消费者 BBQ_F_MC_DEQ:多消费者(默认)
* @return
* 非NULL:消息队列结构体指针,用于后续出队入队等操作。
- * NULL:创建失败。
+ * NULL:创建失败,可通过bbq_errno分析具体错误原因:
+ * - BBQ_ERR_OUT_OF_RANGE:name或count参数超出范围
+ * - BBQ_ERR_ALLOC:申请内存失败
+ * - BBQ_ERR_POWER_OF_TWO:count不为2的n次方
+ * - BBQ_ERR_INPUT_NULL:name传入空指针
*/
extern struct bbq *bbq_create(const char *name, uint32_t count, int socket_id, uint32_t flags);
@@ -88,10 +93,14 @@ extern struct bbq *bbq_create(const char *name, uint32_t count, int socket_id, u
* @param[in] q
* 队列指针
* @param[in] data
- * 则传入一维指针,如:
- * int *data = malloc(sizeof(int));*data = 1; 传入&data
+ * 指向入队指针的指针,如:
+ * int *data = malloc(sizeof(int));*data = TEST_DATA; 传入&data
* @return
- * 成功返回0,失败返回小于0的错误码。
+ * 成功返回0,失败返回小于0的错误码。可能存在以下错误码:
+ * - BBQ_ERR_INPUT_NULL:传入空指针
+ * - BBQ_ERR_FULL:队列已满
+ * - BBQ_ERR_BUSY:队列忙碌中
+ * - BBQ_ERR:其它错误
*/
extern int bbq_enqueue(struct bbq *q, void *const *data);
@@ -101,15 +110,19 @@ extern int bbq_enqueue(struct bbq *q, void *const *data);
* @param[in] q
* 队列指针
* @param[out] data
- * 则传入二维指针,如:
+ * 传入二级指针,如:
* int *data = NULL; 传入&data
* @return
- * 成功返回0,失败返回小于0的错误码。
+ * 成功返回0,失败返回小于0的错误码:
+ * - BBQ_ERR_INPUT_NULL:传入空指针
+ * - BBQ_ERR_EMPTY:队列已空
+ * - BBQ_ERR_BUSY:队列忙碌中
+ * - BBQ_ERR:其它错误
*/
extern int bbq_dequeue(struct bbq *q, void **data);
/**
- * 消息队列批量指针入队,尽可能一次入队n个指针,返回实际成功入队个数
+ * 消息队列批量入队(指针入队),尽可能一次入队n个指针,返回实际成功入队个数
*
* @param[in] q
* 队列指针
@@ -118,45 +131,52 @@ extern int bbq_dequeue(struct bbq *q, void **data);
* uint16_t **obj_table = malloc(sizeof(uint16_t **) * BUF_CNT);
* for(int i=0;i<BUF_CNT;i++){
* obj_table[i] = malloc(sizeof(uint16_t));
- * obj_table[i] = 1;
+ * obj_table[i] = TEST_DATA;
* }
* 传入obj_table
* @param[in] n
* 尝试一次入队的个数
* @param[out] wait_consumed
- * 如果为非NULL,返回当前队列中,已入队的个数。
+ * 如果为非NULL,返回当前队列剩余的个数。注:该赋值可能会带来些许的性能损耗。
* @return
- * 返回实际成功入队的个数
+ * 返回实际成功入队的个数。当入队返回0时,可通过bbq_errno分析具体错误原因:
+ * - BBQ_ERR_INPUT_NULL:传入空指针
+ * - BBQ_ERR_FULL:队列已满
+ * - BBQ_ERR_BUSY:队列忙碌中
+ * - BBQ_ERR:其它错误
*/
extern uint32_t bbq_enqueue_burst(struct bbq *q, void *const *obj_table, uint32_t n, uint32_t *wait_consumed);
/**
- * 消息队列批量出队(指针出队),尽可能一次出队n个数据,返回实际成功出队个数
+ * 消息队列批量指针出队,尽可能一次出队n个数据,返回实际成功出队个数
*
* @param[in] q
* 队列指针
* @param[out] obj_table
- * 存储出队的指针,如:
- * uint16_t **obj_table = malloc(sizeof(uint16_t *))
- * 传入obj_table
+ * 用于存储出队的指针,如:
+ * uint16_t **obj_table = malloc(sizeof(uint16_t *)); 传入obj_table
* @param[in] n
* 尝试一次出队的个数
* @param[out] wait_consumed
- * 如果为非NULL,返回当前队列中,已入队的个数。
+ * 如果为非NULL,返回当前队列中,已入队的个数。注:该赋值可能会带来些许的性能损耗
* @return
- * 返回实际成功出队的个数
+ * 返回实际成功出队的个数。当出队返回0时,可通过bbq_errno分析具体错误原因:
+ * - BBQ_ERR_INPUT_NULL:传入空指针
+ * - BBQ_ERR_EMPTY:队列已空
+ * - BBQ_ERR_BUSY:队列忙碌中
+ * - BBQ_ERR:其它错误
*/
extern uint32_t bbq_dequeue_burst(struct bbq *q, void **obj_table, uint32_t n, uint32_t *wait_consumed);
/**
- * 创建bbq队列,使用当前函数创建的队列,后续操作会把指针指向的数据拷贝入队。
+ * 创建bbq队列,使用当前函数创建的队列,在后续操作会把指针指向的数据拷贝入队。
* 对应入队函数:bbq_enqueue_elem、bbq_enqueue_burst_elem
* 对应出队函数:bbq_dequeue_elem、bbq_dequeue_burst_elem
*
* @param[in] name
- * bbq名称
+ * 队列名称
* @param[in] count
- * 队列所有entry的个数,count必须大于1,且是2的N次方。
+ * 队列大小,参数必须大于1,且是2的N次方。
* @param[in] socket_id
* 多numa架构下,调用libnuma库函数针对指定socket分配内存。
* 当检测到不支持多numa,将转为malloc分配内存。
@@ -166,7 +186,11 @@ extern uint32_t bbq_dequeue_burst(struct bbq *q, void **obj_table, uint32_t n, u
* 2)BBQ_F_DROP_OLD:队列满时,覆盖旧数据,入队成功
* @return
* 非NULL:消息队列结构体指针,用于后续出队入队等操作。
- * NULL:创建失败。
+ * NULL:创建失败。可通过bbq_errno分析具体错误原因:
+ * - BBQ_ERR_OUT_OF_RANGE:name或count参数超出范围
+ * - BBQ_ERR_ALLOC:申请内存失败
+ * - BBQ_ERR_POWER_OF_TWO:count不为2的n次方
+ * - BBQ_ERR_INPUT_NULL:name传入空指针
*/
extern struct bbq *bbq_create_elem(const char *name, uint32_t count, size_t obj_size, int socket_id, uint32_t flags);
@@ -176,9 +200,13 @@ extern struct bbq *bbq_create_elem(const char *name, uint32_t count, size_t obj_
* @param[in] q
* 队列指针
* @param[in] data
- * 传入一维指针,如:int data = 1; 传入&data
+ * 传入一级指针,如:int data = 1; 传入&data
* @return
- * 成功返回0,失败返回小于0的错误码。
+ * 成功返回0,失败返回小于0的错误码。可通过bbq_errno分析具体错误原因:
+ * - BBQ_ERR_INPUT_NULL:传入空指针
+ * - BBQ_ERR_FULL:队列已满
+ * - BBQ_ERR_BUSY:队列忙碌中
+ * - BBQ_ERR:其它错误
*/
extern int bbq_enqueue_elem(struct bbq *q, void const *data);
@@ -188,9 +216,13 @@ extern int bbq_enqueue_elem(struct bbq *q, void const *data);
* @param[in] q
* 队列指针
* @param[in] data
- * 则传入一维指针,如:int data; 传入&data
+ * 则传入一级指针,如:int data; 传入&data
* @return
- * 成功返回0,失败返回小于0的错误码。
+ * 成功返回0,失败返回小于0的错误码。可通过bbq_errno分析具体错误原因:
+ * - BBQ_ERR_INPUT_NULL:传入空指针
+ * - BBQ_ERR_EMPTY:队列已空
+ * - BBQ_ERR_BUSY:队列忙碌中
+ * - BBQ_ERR:其它错误
*/
extern int bbq_dequeue_elem(struct bbq *q, void *data);
@@ -200,14 +232,18 @@ extern int bbq_dequeue_elem(struct bbq *q, void *data);
* @param[in] q
* 队列指针
* @param[in] obj_table
- * 即将入队的数组,将数组里的每个成员入队,如:
+ * 将数组里的每个数据入队,如:
* uint16_t obj_table[1024] = {初始化数据}; 传入obj_table
* @param[in] n
* 尝试一次入队的个数
* @param[out] wait_consumed
- * 如果为非NULL,返回当前队列中,已入队的个数。
+ * 如果为非NULL,返回当前队列中,已入队的个数。。注:该赋值可能会带来些许的性能损耗
* @return
- * 返回实际成功入队个数
+ * 返回实际成功入队的个数。当入队返回0时,可通过bbq_errno分析具体错误原因:
+ * - BBQ_ERR_INPUT_NULL:传入空指针
+ * - BBQ_ERR_FULL:队列已满
+ * - BBQ_ERR_BUSY:队列忙碌中
+ * - BBQ_ERR:其它错误
*/
extern uint32_t bbq_enqueue_burst_elem(struct bbq *q, void const *obj_table, uint32_t n, uint32_t *wait_consumed);
@@ -222,9 +258,13 @@ extern uint32_t bbq_enqueue_burst_elem(struct bbq *q, void const *obj_table, uin
* @param[in] n
* 尝试一次出队的个数
* @param[out] wait_consumed
- * 如果为非NULL,返回当前队列中,已入队的个数。
+ * 如果为非NULL,返回当前队列中,已入队的个数。。注:该赋值可能会带来些许的性能损耗
* @return
- * 返回实际成功出队个数
+ * 成功返回0,失败返回小于0的错误码。可通过bbq_errno分析具体错误原因:
+ * - BBQ_ERR_INPUT_NULL:传入空指针
+ * - BBQ_ERR_EMPTY:队列已空
+ * - BBQ_ERR_BUSY:队列忙碌中
+ * - BBQ_ERR:其它错误
*/
extern uint32_t bbq_dequeue_burst_elem(struct bbq *q, void *obj_table, uint32_t n, uint32_t *wait_consumed);
@@ -255,15 +295,17 @@ extern void bbq_destory(struct bbq *q);
*/
extern void bbq_debug_struct_print(struct bbq *q);
-// 通用返回码
-#define BBQ_OK 0 // 成功
-#define BBQ_ERROR -1 // 通用错误
-#define BBQ_ALLOC_ERR -2 // 内存分配失败
-#define BBQ_NULL_PTR -3 // 空指针
-#define BBQ_UNKNOWN_TYPE -3 // 未知类型
+// 错误码
+#define BBQ_OK 0 // 成功
+#define BBQ_ERR -1 // 通用错误,无法分类时使用
+#define BBQ_ERR_ALLOC -2 // 内存分配失败
+#define BBQ_ERR_INPUT_NULL -3 // 传入空指针
+#define BBQ_ERR_POWER_OF_TWO -4 // 不是2的n次方
+#define BBQ_ERR_OUT_OF_RANGE -5 // 超出范围
+
+#define BBQ_ERR_FULL -101 // 队列已满(入队失败)
+#define BBQ_ERR_BUSY -102 // 队列忙碌中(入队或出队失败)
+#define BBQ_ERR_EMPTY -103 // 队列已空(出队失败)
+#define BBQ_ERR_NOT_SUPPORT -104 // 不支持的操作
-// 队列错误
-#define BBQ_QUEUE_FULL -1001 // 队列已满(入队失败)
-#define BBQ_QUEUE_BUSY -1002 // 队列忙碌中(入队或出队失败)
-#define BBQ_QUEUE_EMPTY -1003 // 队列已空(出队失败)
-#define BBQ_QUEUE_DATA_ERR -1004 // 传入的数据异常 \ No newline at end of file
+extern __thread int32_t bbq_errno; \ No newline at end of file
diff --git a/bbq/src/bbq.c b/bbq/src/bbq.c
index d5bdf03..5e63c51 100644
--- a/bbq/src/bbq.c
+++ b/bbq/src/bbq.c
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-24 10:11:24
+ * @LastEditTime: 2024-06-26 04:59:28
* @Describe: bbq(Block-based Bounded Queue)实现
* 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf
@@ -12,12 +12,12 @@
#include <string.h>
// flags第1位控制入队时的数据拷贝策略,默认是"拷贝指针"
-#define BBQ_F_COPY_PTR 0x0 /**< 默认为拷贝指针 */
-#define BBQ_F_COPY_VALUE 0x0001 /**< 创建队列时设置为拷贝数值 */
+#define BBQ_F_COPY_PTR BBQ_F_DEFAULT /**< 默认为拷贝指针 */
+#define BBQ_F_COPY_VALUE 0x0001 /**< 创建队列时设置为拷贝数值 */
// 判断flags标记位
#define BBQ_F_CHK_DROP_OLD(flags) (flags & BBQ_F_DROP_OLD)
-#define BBQ_F_CHK_VALUE(flags) (flags & BBQ_F_COPY_VALUE)
+#define BBQ_F_CHK_COPY_VALUE(flags) (flags & BBQ_F_COPY_VALUE)
// 避免无用参数的编译告警
#define AVOID_WARNING(param) ((void)param)
@@ -27,6 +27,8 @@
printf("\x1b[31m [ERR][%s:%d:%s]" fmt "\x1b[0m\n", __func__, __LINE__, __FILE__, ##__VA_ARGS__); \
} while (0)
+__thread int32_t bbq_errno;
+
struct bbq_status {
int32_t status; // 返回状态
uint32_t actual_burst; // 实际出/入队个数
@@ -42,46 +44,45 @@ enum bbq_queue_state {
};
struct bbq_entry_desc {
- uint64_t vsn; // allocated游标的版本(vsn) TODO:修正注释
- uint64_t off; // entry在当前block的偏移(offset)
+ uint64_t vsn; // allocated或reserved的版本(vsn)
+ uint64_t off; // entry在当前块的偏移(offset)
uint32_t actual_burst; // 实际出/入队个数
- struct bbq_block *block; // 指向所在的block
+ struct bbq_block *block; // 指向所在的块
};
struct bbq_queue_state_s {
- enum bbq_queue_state state; // 队列状态
- union { // TODO:
- uint64_t vsn; // reserve_entry state==BLOCK_DONE时生效
+ enum bbq_queue_state state; // 队列状态
+ union {
+ uint64_t vsn; // bbq_reserve_entry state==BLOCK_DONE时生效
struct bbq_entry_desc e; // state为ALLOCATED、RESERVED生效
};
};
-extern inline uint64_t bbq_idx(struct bbq *q, uint64_t x) {
+extern inline uint64_t bbq_head_idx(struct bbq *q, uint64_t x) {
return x & q->idx_mask;
}
-extern inline uint64_t bbq_off(struct bbq *q, uint64_t x) {
- return x & q->off_mask;
-}
-
extern inline uint64_t bbq_head_vsn(struct bbq *q, uint64_t x) {
return x >> q->idx_bits;
}
+extern inline uint64_t bbq_cur_off(struct bbq *q, uint64_t x) {
+ return x & q->off_mask;
+}
+
extern inline uint64_t bbq_cur_vsn(struct bbq *q, uint64_t x) {
return x >> q->off_bits;
}
-extern inline uint64_t set_cur_vsn(struct bbq *q, uint64_t ver) {
+static inline uint64_t bbq_set_cur_vsn(struct bbq *q, uint64_t ver) {
return ver << q->off_bits;
}
// 当BBQ_MEMORY宏定义开关打开,将对内存分配释放进行统计,方便排查内存泄漏
-// #define BBQ_MEMORY
enum bbq_module {
- BBQ_MODULE_QUEUE = 0,
- BBQ_MODULE_QUEUE_BLOCK_NB,
- BBQ_MODULE_QUEUE_BLOCK_ENTRY,
+ BBQ_MODULE_MAIN = 0,
+ BBQ_MODULE_BLOCK_NB,
+ BBQ_MODULE_BLOCK_ENTRY,
BBQ_MODULE_MAX,
};
@@ -132,7 +133,7 @@ static void bbq_free(enum bbq_module module, int socket_id, void *ptr, size_t si
}
/* 原子的比较两个值大小,并设置较大的值,成功则返回设置前的旧值 */
-uint64_t fetch_max(aotmic_uint64 *atom, uint64_t upd) {
+uint64_t bbq_fetch_max(aotmic_uint64 *atom, uint64_t upd) {
uint64_t old_value = 0;
do {
old_value = atomic_load(atom);
@@ -143,7 +144,7 @@ uint64_t fetch_max(aotmic_uint64 *atom, uint64_t upd) {
/* 检查参数是否为2的N次幂 */
bool bbq_check_power_of_two(uint32_t n) {
- if (n <= 0) {
+ if (n == 0) {
return false;
}
@@ -152,7 +153,7 @@ bool bbq_check_power_of_two(uint32_t n) {
/* 根据entries大小返回合理的block个数
* 计算公式:log2(blocks) = max(1, ⌊log2(count)/4⌋) 注:⌊ ⌋代表向下取整。*/
-uint32_t bbq_blocks_calc(uint32_t entries) {
+static uint32_t bbq_block_number_calc(uint32_t entries) {
double log_entries = log2((double)entries);
uint32_t over4 = (uint32_t)(log_entries / 4); // 向下取整
uint32_t max_value = (over4 > 1) ? over4 : 1;
@@ -160,22 +161,44 @@ uint32_t bbq_blocks_calc(uint32_t entries) {
return n;
}
+int bbq_bnbs_calc(uint32_t entries, uint32_t *bn, uint32_t *bs) {
+ if (bn == NULL || bs == NULL) {
+ bbq_errno = BBQ_ERR_INPUT_NULL;
+ return bbq_errno;
+ }
+
+ if (entries <= 1) {
+ bbq_errno = BBQ_ERR_OUT_OF_RANGE;
+ return bbq_errno;
+ }
+
+ if (bbq_check_power_of_two(entries) == false) {
+ bbq_errno = BBQ_ERR_POWER_OF_TWO;
+ return bbq_errno;
+ }
+
+ *bn = bbq_block_number_calc(entries);
+ *bs = entries / *bn;
+
+ return BBQ_OK;
+}
+
/* 块初始化 */
int block_init(struct bbq *q, struct bbq_block *block, bool cursor_init) {
#ifdef BBQ_MEMORY
- // 末尾多分配一个entry(永远不应该被修改),以此检查是否存在写越界的问题
- block->entries = bbq_malloc(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id,
+ // 末尾多分配一个entry,它永远不应该被修改,以此检查是否存在写越界的问题
+ block->entries = bbq_malloc(BBQ_MODULE_BLOCK_ENTRY, q->socket_id,
(q->bs + 1) * q->entry_size);
char *last_entry = block->entries + q->entry_size * q->bs;
memset(last_entry, BBQ_MEM_MAGIC, q->entry_size);
#else
- block->entries = bbq_malloc(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id,
+ block->entries = bbq_malloc(BBQ_MODULE_BLOCK_ENTRY, q->socket_id,
q->bs * q->entry_size);
#endif
if (block->entries == NULL) {
- BBQ_ERR_LOG("bbq_malloc error");
- return BBQ_ALLOC_ERR;
+ bbq_errno = BBQ_ERR_ALLOC;
+ return bbq_errno;
}
block->committed = ATOMIC_VAR_INIT(0);
@@ -189,7 +212,7 @@ int block_init(struct bbq *q, struct bbq_block *block, bool cursor_init) {
block->allocated = ATOMIC_VAR_INIT(q->bs);
block->reserved = ATOMIC_VAR_INIT(q->bs);
if (BBQ_F_CHK_DROP_OLD(q->flags)) {
- block->consumed = ATOMIC_VAR_INIT(0);
+ block->consumed = ATOMIC_VAR_INIT(0); // drop old模式下用不到consumed
} else {
block->consumed = ATOMIC_VAR_INIT(q->bs);
}
@@ -202,11 +225,11 @@ int block_init(struct bbq *q, struct bbq_block *block, bool cursor_init) {
void block_destory(struct bbq *q, struct bbq_block *block) {
if (block->entries) {
#ifdef BBQ_MEMORY
- bbq_free(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id,
- block->entries, sizeof(*block->entries) * (q->bs + 1) * q->entry_size);
+ bbq_free(BBQ_MODULE_BLOCK_ENTRY, q->socket_id,
+ block->entries, (q->bs + 1) * q->entry_size);
#else
- bbq_free(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id,
- block->entries, sizeof(*block->entries) * q->bs * q->entry_size);
+ bbq_free(BBQ_MODULE_BLOCK_ENTRY, q->socket_id,
+ block->entries, q->bs * q->entry_size);
#endif
block->entries = NULL;
}
@@ -214,21 +237,18 @@ void block_destory(struct bbq *q, struct bbq_block *block) {
/*
求x在二进制表示中最高位1所在的位置,x参数不能为0。
-例如:x=1,return 0 (...1)
-x=3,return 1 (..11)
-x=9,return 3 (1..1)
+例如:x=1,return 0 (...1); x=3,return 1 (..11); x=9,return 3 (1..1)
*/
-unsigned floor_log2(uint64_t x) {
- return x == 1 ? 0 : 1 + floor_log2(x >> 1);
+static unsigned bbq_floor_log2(uint64_t x) {
+ return x == 1 ? 0 : 1 + bbq_floor_log2(x >> 1);
}
/*
返回以2为底x的对数,并向上取整值。
-例如:x=1,return 0 (2^0=1)
-x=99, return 7(2^6=64 2^7=128)
+例如:x=1,return 0 (2^0=1); x=99, return 7(2^6=64 2^7=128)
*/
-unsigned ceil_log2(uint64_t x) {
- return x == 1 ? 0 : floor_log2(x - 1) + 1;
+static unsigned bbq_ceil_log2(uint64_t x) {
+ return x == 1 ? 0 : bbq_floor_log2(x - 1) + 1;
}
/* 创建消息队列,bn和bs必须是2的N次幂,socket_id用于多numa分配内存 */
@@ -236,33 +256,33 @@ static struct bbq *__bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs,
int ret = 0;
if (bbq_check_power_of_two(bn) == false) {
- BBQ_ERR_LOG("block number is not power of two, now is :%u", bn);
+ bbq_errno = BBQ_ERR_POWER_OF_TWO;
return NULL;
}
if (bbq_check_power_of_two(bs) == false) {
- BBQ_ERR_LOG("block size is not power of two, now is :%u", bs);
+ bbq_errno = BBQ_ERR_POWER_OF_TWO;
return NULL;
}
- if (obj_size == 0) {
- BBQ_ERR_LOG("obj_size is 0");
+ if (name == NULL) {
+ bbq_errno = BBQ_ERR_INPUT_NULL;
return NULL;
}
- if (name == NULL || strlen(name) >= BBQ_SYMBOL_MAX - 1) {
- BBQ_ERR_LOG("invalid bbq name, max len is %d", BBQ_SYMBOL_MAX - 1);
+ if (strlen(name) >= BBQ_SYMBOL_MAX - 1 || obj_size == 0) {
+ bbq_errno = BBQ_ERR_OUT_OF_RANGE;
return NULL;
}
if (numa_available() < 0) {
- // 不支持numa,设置
+ // 不支持numa
socket_id = BBQ_SOCKET_ID_ANY;
}
- struct bbq *q = bbq_malloc(BBQ_MODULE_QUEUE, socket_id, sizeof(*q));
+ struct bbq *q = bbq_malloc(BBQ_MODULE_MAIN, socket_id, sizeof(*q));
if (q == NULL) {
- BBQ_ERR_LOG("malloc for bbq queue error");
+ bbq_errno = BBQ_ERR_ALLOC;
return NULL;
}
memset(q, 0, sizeof(*q));
@@ -275,25 +295,22 @@ static struct bbq *__bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs,
q->chead = ATOMIC_VAR_INIT(0);
q->flags = flags;
- q->blocks = bbq_malloc(BBQ_MODULE_QUEUE_BLOCK_NB, socket_id, bn * sizeof(*q->blocks));
+ q->blocks = bbq_malloc(BBQ_MODULE_BLOCK_NB, socket_id, bn * sizeof(*q->blocks));
if (q->blocks == NULL) {
- BBQ_ERR_LOG("bbq malloc for blocks error");
+ bbq_errno = BBQ_ERR_ALLOC;
goto error;
}
memset(q->blocks, 0, sizeof(*q->blocks));
for (uint32_t i = 0; i < bn; ++i) {
- // 第一个block不需要设置cursor_init
- bool cursor_init = (i == 0 ? false : true);
- ret = block_init(q, &(q->blocks[i]), cursor_init);
+ ret = block_init(q, &(q->blocks[i]), (i == 0 ? false : true));
if (ret != BBQ_OK) {
- BBQ_ERR_LOG("bbq block init error");
goto error;
}
}
- q->idx_bits = ceil_log2(bn);
- q->off_bits = ceil_log2(bs) + 1; // 多线程同时add,可能超过bs的问题,因此多分配一位
+ q->idx_bits = bbq_ceil_log2(bn);
+ q->off_bits = bbq_ceil_log2(bs) + 1; // 多线程同时add,可能超过bs的问题,因此多分配一位 TODO:bs占位少的时候多分一些?
q->idx_mask = (1 << q->idx_bits) - 1;
q->off_mask = (1 << q->off_bits) - 1;
@@ -305,37 +322,41 @@ error:
return NULL;
}
-struct bbq *bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs, int socket_id, uint32_t flags) {
+/* 使用自定义的bn、bs创建指针入队的bbq,一般用于单元测试 */
+struct bbq *bbq_create_with_bnbs(const char *name, uint32_t bn, uint32_t bs, int socket_id, uint32_t flags) {
+ bbq_errno = BBQ_OK;
return __bbq_create_bnbs(name, bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR);
}
-struct bbq *bbq_create_bnbs_elem(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags) {
+/* 使用自定义的bn、bs创建值入队的bbq,一般用于单元测试 */
+struct bbq *bbq_create_elem_with_bnbs(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags) {
+ bbq_errno = BBQ_OK;
return __bbq_create_bnbs(name, bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE);
}
/* 创建消息队列,count必须大于1,且是2的N次幂,bn和bs将根据count值自动计算,socket_id用于多numa分配内存,free_func先设置NULL */
struct bbq *bbq_create_elem(const char *name, uint32_t count, size_t obj_size, int socket_id, uint32_t flags) {
- if (bbq_check_power_of_two(count) == false || count == 1) {
- BBQ_ERR_LOG("bbq entries number must be power of two and greater than 1, now is :%u", count);
+ bbq_errno = BBQ_OK;
+ uint32_t bn = 0;
+ uint32_t bs = 0;
+
+ if (bbq_bnbs_calc(count, &bn, &bs) != BBQ_OK) {
return NULL;
}
- uint32_t bn = bbq_blocks_calc(count);
- uint32_t bs = count / bn;
-
- return bbq_create_bnbs_elem(name, bn, bs, obj_size, socket_id, flags);
+ return __bbq_create_bnbs(name, bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE);
}
struct bbq *bbq_create(const char *name, uint32_t count, int socket_id, uint32_t flags) {
- if (bbq_check_power_of_two(count) == false || count == 1) {
- BBQ_ERR_LOG("bbq entries number must be power of two and greater than 1, now is :%u", count);
+ bbq_errno = BBQ_OK;
+ uint32_t bn = 0;
+ uint32_t bs = 0;
+
+ if (bbq_bnbs_calc(count, &bn, &bs) != BBQ_OK) {
return NULL;
}
- uint32_t bn = bbq_blocks_calc(count);
- uint32_t bs = count / bn;
-
- return bbq_create_bnbs(name, bn, bs, socket_id, flags);
+ return __bbq_create_bnbs(name, bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR);
}
/* 释放消息队列,与bbq_ring_create系列接口成对*/
@@ -348,8 +369,8 @@ void bbq_destory(struct bbq *q) {
block_destory(q, &(q->blocks[i]));
}
- bbq_free(BBQ_MODULE_QUEUE_BLOCK_NB, q->socket_id, q->blocks, q->bn * sizeof(*q->blocks));
- bbq_free(BBQ_MODULE_QUEUE, q->socket_id, q, sizeof(*q));
+ bbq_free(BBQ_MODULE_BLOCK_NB, q->socket_id, q->blocks, q->bn * sizeof(*q->blocks));
+ bbq_free(BBQ_MODULE_MAIN, q->socket_id, q, sizeof(*q));
}
#define BBQ_DATA_TYPE_SINGLE 0x0
@@ -358,7 +379,7 @@ void bbq_destory(struct bbq *q) {
void commit_entry(struct bbq *q, struct bbq_entry_desc *e, void const *data, uint32_t data_type) {
size_t idx = e->off * q->entry_size;
- if (BBQ_F_CHK_VALUE(q->flags)) {
+ if (BBQ_F_CHK_COPY_VALUE(q->flags)) {
// 数据入队列
switch (data_type) {
case BBQ_DATA_TYPE_ARRAY_1D:
@@ -376,6 +397,7 @@ void commit_entry(struct bbq *q, struct bbq_entry_desc *e, void const *data, uin
break;
}
default:
+ bbq_errno = BBQ_ERR;
break;
}
} else {
@@ -387,8 +409,8 @@ void commit_entry(struct bbq *q, struct bbq_entry_desc *e, void const *data, uin
memcpy(&(e->block->entries[idx]), data, q->entry_size * e->actual_burst);
break;
case BBQ_DATA_TYPE_ARRAY_1D:
- break;
default:
+ bbq_errno = BBQ_ERR;
break;
}
}
@@ -397,17 +419,17 @@ void commit_entry(struct bbq *q, struct bbq_entry_desc *e, void const *data, uin
struct bbq_queue_state_s allocate_entry(struct bbq *q, struct bbq_block *block, uint32_t n) {
struct bbq_queue_state_s state = {0};
- if (bbq_off(q, atomic_load(&block->allocated)) >= q->bs) {
+ if (bbq_cur_off(q, atomic_load(&block->allocated)) >= q->bs) {
state.state = BBQ_BLOCK_DONE;
return state;
}
uint64_t old = atomic_fetch_add(&block->allocated, n);
+ // committed_vsn在当前块被初始化后值是不变的,通过比较vsn值,来判断allocated的off是否溢出了,导致vsn+1
uint64_t committed_vsn = bbq_cur_vsn(q, atomic_load(&block->committed));
- // committed_vsn在当前块被初始化后值是不变的,通过比较vsn值,来判断allocated的off是否溢出了,导致vsn+1
uint64_t cur_vsn = bbq_cur_vsn(q, old);
- uint64_t cur_off = bbq_off(q, old);
+ uint64_t cur_off = bbq_cur_off(q, old);
if ((cur_vsn != committed_vsn) || (cur_off >= q->bs)) {
state.state = BBQ_BLOCK_DONE;
return state;
@@ -429,27 +451,28 @@ struct bbq_queue_state_s allocate_entry(struct bbq *q, struct bbq_block *block,
}
enum bbq_queue_state advance_phead(struct bbq *q, uint64_t ph) {
- // 获取下一个block
uint64_t cur = 0;
- struct bbq_block *n_blk = &(q->blocks[(bbq_idx(q, ph) + 1) & q->idx_mask]);
+ // 获取下一个block
+ struct bbq_block *n_blk = &(q->blocks[(bbq_head_idx(q, ph) + 1) & q->idx_mask]);
uint64_t ph_vsn = bbq_head_vsn(q, ph);
if (BBQ_F_CHK_DROP_OLD(q->flags)) {
cur = atomic_load(&n_blk->committed);
- // 生产者避免前进到上一轮中尚未完全提交的区块
- if (bbq_cur_vsn(q, cur) == ph_vsn && bbq_off(q, cur) != q->bs) {
+ // 生产者head避免覆盖上一轮尚未完全提交的区块
+ if (bbq_cur_vsn(q, cur) == ph_vsn && bbq_cur_off(q, cur) != q->bs) {
return BBQ_NOT_AVAILABLE;
}
} else {
cur = atomic_load(&n_blk->consumed);
uint64_t reserved;
- uint64_t consumed_off = bbq_off(q, cur);
+ uint64_t consumed_off = bbq_cur_off(q, cur);
uint64_t consumed_vsn = bbq_cur_vsn(q, cur);
- if (consumed_vsn < ph_vsn || // 生产者赶上了消费者
+ if (consumed_vsn < ph_vsn ||
(consumed_vsn == ph_vsn && consumed_off != q->bs)) {
+ // 生产者赶上了消费者
reserved = atomic_load(&n_blk->reserved);
- if (bbq_off(q, reserved) == consumed_off) {
+ if (bbq_cur_off(q, reserved) == consumed_off) {
return BBQ_NO_ENTRY;
} else {
return BBQ_NOT_AVAILABLE;
@@ -457,19 +480,20 @@ enum bbq_queue_state advance_phead(struct bbq *q, uint64_t ph) {
}
}
- // 用head的version初始化下一个块,version在高位,version+1,idex/offset清零,如果没有被其他线程执行过,数值会高于旧值。多线程同时只更新一次。
- uint64_t new_vsn = set_cur_vsn(q, ph_vsn + 1);
- fetch_max(&n_blk->committed, new_vsn);
- fetch_max(&n_blk->allocated, new_vsn);
+ // 用head的version初始化下一个块
+ // version在高位,version+1,index或offset也会被清零,如果没有被其他线程执行过。多线程同时只更新一次。
+ uint64_t new_vsn = bbq_set_cur_vsn(q, ph_vsn + 1);
+ bbq_fetch_max(&n_blk->committed, new_vsn);
+ bbq_fetch_max(&n_blk->allocated, new_vsn);
- // 索引+1,当超过索引范围,也就是循环下一轮块时,version+1
- fetch_max(&q->phead, ph + 1);
+ // ph+1,当超过索引范围,进入下一轮时,version会自动+1
+ bbq_fetch_max(&q->phead, ph + 1);
return BBQ_SUCCESS;
}
static uint32_t bbq_wait_consumed_set(struct bbq *q, uint64_t *ch_ptr, uint64_t *ph_ptr, struct bbq_block *blk_ph) {
- uint64_t ch;
- uint64_t ph;
+ uint64_t ch = 0;
+ uint64_t ph = 0;
if (ch_ptr != NULL) {
ch = *ch_ptr;
} else {
@@ -482,14 +506,14 @@ static uint32_t bbq_wait_consumed_set(struct bbq *q, uint64_t *ch_ptr, uint64_t
ph = atomic_load(&q->phead);
}
- uint64_t ph_idx = bbq_idx(q, ph);
- uint64_t ch_idx = bbq_idx(q, ch);
- uint64_t committed_off = bbq_off(q, atomic_load(&blk_ph->committed));
+ uint64_t ph_idx = bbq_head_idx(q, ph);
+ uint64_t ch_idx = bbq_head_idx(q, ch);
+ uint64_t committed_off = bbq_cur_off(q, atomic_load(&blk_ph->committed));
- struct bbq_block *blk_ch = &(q->blocks[bbq_idx(q, ch)]);
- uint64_t reserved_off = bbq_off(q, atomic_load(&blk_ch->reserved));
+ struct bbq_block *blk_ch = &(q->blocks[bbq_head_idx(q, ch)]);
+ uint64_t reserved_off = bbq_cur_off(q, atomic_load(&blk_ch->reserved));
- // "生产者"超过"消费者"的索引(即块)的个数
+ // "生产者"超过"消费者"块的个数
uint64_t idx_diff = ph_idx >= ch_idx ? ph_idx - ch_idx : q->bn - ch_idx + ph_idx;
if (!BBQ_F_CHK_DROP_OLD(q->flags)) {
// 这里idx_diff-1=-1也是正确。
@@ -500,36 +524,37 @@ static uint32_t bbq_wait_consumed_set(struct bbq *q, uint64_t *ch_ptr, uint64_t
uint64_t ph_vsn = bbq_head_vsn(q, ph);
if (ph_vsn == ch_vsn || (ph_vsn == (ch_vsn + 1) && (ph_idx < ch_idx))) {
+ // drop old模式,未发生覆盖
return (idx_diff - 1) * q->bs + (q->bs - reserved_off + committed_off);
}
- // 发生了覆盖
if (ph_idx == ch_idx) {
- // 当前块以及之前已生产的都作废
+ // drop old模式,发生了覆盖,当前块以及之前已生产的都作废
return 0;
}
return (idx_diff - 1) * q->bs + committed_off;
}
+//-----------------------------------------------------------------
/* 消息队列入队 */
-static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t n, uint32_t flag, uint32_t *wait_consumed) {
+static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t n, uint32_t data_type, uint32_t *wait_consumed) {
struct bbq_status ret = {.status = 0, .actual_burst = 0};
if (q == NULL || data == NULL) {
- ret.status = BBQ_NULL_PTR;
+ bbq_errno = BBQ_ERR_INPUT_NULL;
+ ret.status = bbq_errno;
return ret;
}
while (true) {
- // 获取当前phead,转为索引后获取到当前的blk
uint64_t ph = atomic_load(&q->phead);
- struct bbq_block *blk = &(q->blocks[bbq_idx(q, ph)]);
+ struct bbq_block *blk = &(q->blocks[bbq_head_idx(q, ph)]);
struct bbq_queue_state_s state = allocate_entry(q, blk, n);
switch (state.state) {
case BBQ_ALLOCATED:
- commit_entry(q, &state.e, data, flag);
+ commit_entry(q, &state.e, data, data_type);
ret.actual_burst = state.e.actual_burst;
ret.status = BBQ_OK;
break;
@@ -540,17 +565,21 @@ static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t
}
if (pstate == BBQ_NO_ENTRY) {
- ret.status = BBQ_QUEUE_FULL;
+ bbq_errno = BBQ_ERR_FULL;
+ ret.status = bbq_errno;
} else if (pstate == BBQ_NOT_AVAILABLE) {
- ret.status = BBQ_QUEUE_BUSY;
+ bbq_errno = BBQ_ERR_BUSY;
+ ret.status = bbq_errno;
} else {
- ret.status = BBQ_ERROR;
+ bbq_errno = BBQ_ERR;
+ ret.status = bbq_errno;
}
break;
}
default:
- ret.status = BBQ_ERROR;
+ bbq_errno = BBQ_ERR;
+ ret.status = bbq_errno;
break;
}
@@ -563,21 +592,23 @@ static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t
}
int bbq_enqueue(struct bbq *q, void *const *data) {
+ bbq_errno = BBQ_OK;
struct bbq_status ret = __bbq_enqueue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL);
return ret.status;
}
int bbq_enqueue_elem(struct bbq *q, void const *data) {
+ bbq_errno = BBQ_OK;
struct bbq_status ret = __bbq_enqueue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL);
return ret.status;
}
/* 更新成功 reserve成功的个数 */
-uint32_t reserve_update(bbq_cursor *aotmic, uint64_t reserved, uint32_t n) {
+uint32_t bbq_reserve_update(bbq_cursor *aotmic, uint64_t reserved, uint32_t n) {
// TODO:逻辑可以合并
if (n == 1) {
// fetch_max返回的是旧值,如果旧值等于局部变量reserved,则代表数据由当前线程更新
- if (fetch_max(aotmic, reserved + 1) == reserved) {
+ if (bbq_fetch_max(aotmic, reserved + 1) == reserved) {
return 1;
}
@@ -588,11 +619,11 @@ uint32_t reserve_update(bbq_cursor *aotmic, uint64_t reserved, uint32_t n) {
}
}
-struct bbq_queue_state_s reserve_entry(struct bbq *q, struct bbq_block *block, uint32_t n) {
+struct bbq_queue_state_s bbq_reserve_entry(struct bbq *q, struct bbq_block *block, uint32_t n) {
while (true) {
struct bbq_queue_state_s state;
uint64_t reserved = atomic_load(&block->reserved);
- uint64_t reserved_off = bbq_off(q, reserved);
+ uint64_t reserved_off = bbq_cur_off(q, reserved);
uint64_t reserved_svn = bbq_cur_vsn(q, reserved);
if (reserved_off < q->bs) {
@@ -607,7 +638,7 @@ struct bbq_queue_state_s reserve_entry(struct bbq *q, struct bbq_block *block, u
}
uint64_t committed = atomic_load(&block->committed);
- uint64_t committed_off = bbq_off(q, committed);
+ uint64_t committed_off = bbq_cur_off(q, committed);
if (committed_off == reserved_off) {
state.state = BBQ_NO_ENTRY;
return state;
@@ -616,14 +647,14 @@ struct bbq_queue_state_s reserve_entry(struct bbq *q, struct bbq_block *block, u
// 当前块的数据没有被全部commited,需要通过判断allocated和committed来判断是否存在正在入队进行中的数据
if (committed_off != q->bs) {
uint64_t allocated = atomic_load(&block->allocated);
- if (bbq_off(q, allocated) != committed_off) {
+ if (bbq_cur_off(q, allocated) != committed_off) {
state.state = BBQ_NOT_AVAILABLE;
return state;
}
}
uint32_t tmp = committed_off - reserved_off;
- uint32_t reserved_cnt = reserve_update(&block->reserved, reserved, tmp < n ? tmp : n);
+ uint32_t reserved_cnt = bbq_reserve_update(&block->reserved, reserved, tmp < n ? tmp : n);
if (reserved_cnt > 0) { // TODO:多entry时关注
state.state = BBQ_RESERVED;
state.e.actual_burst = reserved_cnt;
@@ -647,7 +678,7 @@ struct bbq_queue_state_s reserve_entry(struct bbq *q, struct bbq_block *block, u
bool consume_entry(struct bbq *q, struct bbq_entry_desc *e, void *deq_data, uint32_t data_type) {
size_t idx = e->off * q->entry_size;
- if (BBQ_F_CHK_VALUE(q->flags)) {
+ if (BBQ_F_CHK_COPY_VALUE(q->flags)) {
switch (data_type) {
case BBQ_DATA_TYPE_ARRAY_1D:
case BBQ_DATA_TYPE_SINGLE:
@@ -694,7 +725,7 @@ bool consume_entry(struct bbq *q, struct bbq_entry_desc *e, void *deq_data, uint
}
bool advance_chead(struct bbq *q, uint64_t ch, uint64_t ver) {
- uint64_t ch_idx = bbq_idx(q, ch);
+ uint64_t ch_idx = bbq_head_idx(q, ch);
struct bbq_block *n_blk = &(q->blocks[(ch_idx + 1) & q->idx_mask]);
uint64_t ch_vsn = bbq_head_vsn(q, ch);
@@ -705,18 +736,18 @@ bool advance_chead(struct bbq *q, uint64_t ch, uint64_t ver) {
// 第一个块是一个特殊情况,因为与其他块相比,它的版本总是相差一个。因此,如果 ch_idx == 0,我们在比较中加 1
if (committed_vsn < ver + (ch_idx == 0))
return false;
- fetch_max(&n_blk->reserved, set_cur_vsn(q, committed_vsn));
+ bbq_fetch_max(&n_blk->reserved, bbq_set_cur_vsn(q, committed_vsn));
} else {
if (committed_vsn != ch_vsn + 1) {
// 消费者追上了生产者,下一块还未开始生产
return false;
}
- uint64_t new_vsn = set_cur_vsn(q, ch_vsn + 1);
- fetch_max(&n_blk->consumed, new_vsn);
- fetch_max(&n_blk->reserved, new_vsn);
+ uint64_t new_vsn = bbq_set_cur_vsn(q, ch_vsn + 1);
+ bbq_fetch_max(&n_blk->consumed, new_vsn);
+ bbq_fetch_max(&n_blk->reserved, new_vsn);
}
- fetch_max(&q->chead, ch + 1);
+ bbq_fetch_max(&q->chead, ch + 1);
return true;
}
@@ -724,15 +755,16 @@ bool advance_chead(struct bbq *q, uint64_t ch, uint64_t ver) {
static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n, uint32_t data_type, uint32_t *wait_consumed) {
struct bbq_status ret = {.status = 0, .actual_burst = 0};
if (q == NULL || deq_data == NULL) {
- ret.status = BBQ_NULL_PTR;
+ bbq_errno = BBQ_ERR_INPUT_NULL;
+ ret.status = bbq_errno;
return ret;
}
while (true) {
uint64_t ch = atomic_load(&q->chead);
- struct bbq_block *blk = &(q->blocks[bbq_idx(q, ch)]);
+ struct bbq_block *blk = &(q->blocks[bbq_head_idx(q, ch)]);
struct bbq_queue_state_s state;
- state = reserve_entry(q, blk, n);
+ state = bbq_reserve_entry(q, blk, n);
switch (state.state) {
case BBQ_RESERVED:
@@ -743,19 +775,23 @@ static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n
ret.actual_burst = state.e.actual_burst;
break;
case BBQ_NO_ENTRY:
- ret.status = BBQ_QUEUE_EMPTY;
+ bbq_errno = BBQ_ERR_EMPTY;
+ ret.status = bbq_errno;
break;
case BBQ_NOT_AVAILABLE:
- ret.status = BBQ_QUEUE_BUSY;
+ bbq_errno = BBQ_ERR_BUSY;
+ ret.status = bbq_errno;
break;
case BBQ_BLOCK_DONE:
if (advance_chead(q, ch, state.vsn)) {
continue;
}
- ret.status = BBQ_QUEUE_EMPTY;
+ bbq_errno = BBQ_ERR_EMPTY;
+ ret.status = bbq_errno;
break;
default:
- ret.status = BBQ_ERROR;
+ bbq_errno = BBQ_ERR;
+ ret.status = bbq_errno;
break;
}
@@ -768,16 +804,18 @@ static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n
}
int bbq_dequeue(struct bbq *q, void **data) {
+ bbq_errno = BBQ_OK;
struct bbq_status ret = __bbq_dequeue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL);
return ret.status;
}
int bbq_dequeue_elem(struct bbq *q, void *data) {
+ bbq_errno = BBQ_OK;
struct bbq_status ret = __bbq_dequeue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL);
return ret.status;
}
-uint32_t bbq_max_burst(struct bbq *q, uint32_t n) {
+static uint32_t bbq_max_burst(struct bbq *q, uint32_t n) {
uint32_t burst = n;
if (burst > q->bs) {
burst = q->bs;
@@ -788,11 +826,13 @@ uint32_t bbq_max_burst(struct bbq *q, uint32_t n) {
static uint32_t bbq_dequeue_burst_one_dimensional(struct bbq *q, void *obj_table, uint32_t n, uint32_t *wait_consumed) {
if (q == NULL || obj_table == NULL) {
- return BBQ_NULL_PTR;
+ bbq_errno = BBQ_ERR_INPUT_NULL;
+ return 0;
}
- if (!BBQ_F_CHK_VALUE(q->flags)) {
- return BBQ_QUEUE_DATA_ERR;
+ if (!BBQ_F_CHK_COPY_VALUE(q->flags)) {
+ bbq_errno = BBQ_ERR_NOT_SUPPORT;
+ return 0;
}
uint32_t burst = 0;
@@ -815,7 +855,8 @@ static uint32_t bbq_dequeue_burst_one_dimensional(struct bbq *q, void *obj_table
static uint32_t bbq_dequeue_burst_two_dimensional(struct bbq *q, void **obj_table, uint32_t n, uint32_t *wait_consumed) {
if (q == NULL || obj_table == NULL) {
- return BBQ_NULL_PTR;
+ bbq_errno = BBQ_ERR_INPUT_NULL;
+ return 0;
}
uint32_t burst = 0;
@@ -837,13 +878,15 @@ static uint32_t bbq_dequeue_burst_two_dimensional(struct bbq *q, void **obj_tabl
}
/* 尝试一次入队多个数据,直到达到最大数量,或是入队失败 */
-uint32_t bbq_enqueue_burst_one_dimensional(struct bbq *q, void const *obj_table, uint32_t n, uint32_t *wait_consumed) {
+static uint32_t bbq_enqueue_burst_one_dimensional(struct bbq *q, void const *obj_table, uint32_t n, uint32_t *wait_consumed) {
if (q == NULL || obj_table == NULL) {
- return BBQ_NULL_PTR;
+ bbq_errno = BBQ_ERR_INPUT_NULL;
+ return 0;
}
- if (!BBQ_F_CHK_VALUE(q->flags)) {
- return BBQ_QUEUE_DATA_ERR;
+ if (!BBQ_F_CHK_COPY_VALUE(q->flags)) {
+ bbq_errno = BBQ_ERR_NOT_SUPPORT;
+ return 0;
}
uint32_t burst = 0;
@@ -865,9 +908,10 @@ uint32_t bbq_enqueue_burst_one_dimensional(struct bbq *q, void const *obj_table,
}
/* 尝试一次入队多个数据,直到达到最大数量,或是入队失败 */
-uint32_t bbq_enqueue_burst_two_dimensional(struct bbq *q, void *const *obj_table, uint32_t n, uint32_t *wait_consumed) {
+static uint32_t bbq_enqueue_burst_two_dimensional(struct bbq *q, void *const *obj_table, uint32_t n, uint32_t *wait_consumed) {
if (q == NULL || obj_table == NULL) {
- return BBQ_NULL_PTR;
+ bbq_errno = BBQ_ERR_INPUT_NULL;
+ return 0;
}
uint32_t burst = 0;
@@ -894,8 +938,8 @@ bool bbq_empty(struct bbq *q) {
uint64_t ph_vsn = bbq_head_vsn(q, phead);
uint64_t ch_vsn = bbq_head_vsn(q, chead);
- uint64_t ph_idx = bbq_idx(q, phead);
- uint64_t ch_idx = bbq_idx(q, chead);
+ uint64_t ph_idx = bbq_head_idx(q, phead);
+ uint64_t ch_idx = bbq_head_idx(q, chead);
struct bbq_block *block;
@@ -905,13 +949,13 @@ bool bbq_empty(struct bbq *q) {
block = &q->blocks[ph_idx];
if (ph_vsn == ch_vsn) {
- if (bbq_off(q, atomic_load(&block->reserved)) == bbq_off(q, atomic_load(&block->committed))) {
+ if (bbq_cur_off(q, atomic_load(&block->reserved)) == bbq_cur_off(q, atomic_load(&block->committed))) {
return true;
}
}
bbq_cursor reserved = atomic_load(&block->reserved);
- uint64_t reserved_off = bbq_off(q, reserved);
+ uint64_t reserved_off = bbq_cur_off(q, reserved);
if (BBQ_F_CHK_DROP_OLD(q->flags) &&
ph_vsn > ch_vsn &&
@@ -925,22 +969,27 @@ bool bbq_empty(struct bbq *q) {
}
uint32_t bbq_enqueue_burst_elem(struct bbq *q, void const *obj_table, uint32_t n, uint32_t *wait_consumed) {
+ bbq_errno = BBQ_OK;
return bbq_enqueue_burst_one_dimensional(q, obj_table, n, wait_consumed);
}
uint32_t bbq_enqueue_burst_elem_two_dimensional(struct bbq *q, void *const *obj_table, uint32_t n, uint32_t *wait_consumed) {
+ bbq_errno = BBQ_OK;
return bbq_enqueue_burst_two_dimensional(q, obj_table, n, wait_consumed);
}
uint32_t bbq_enqueue_burst(struct bbq *q, void *const *obj_table, uint32_t n, uint32_t *wait_consumed) {
+ bbq_errno = BBQ_OK;
return bbq_enqueue_burst_two_dimensional(q, obj_table, n, wait_consumed);
}
uint32_t bbq_dequeue_burst(struct bbq *q, void **obj_table, uint32_t n, uint32_t *wait_consumed) {
+ bbq_errno = BBQ_OK;
return bbq_dequeue_burst_two_dimensional(q, obj_table, n, wait_consumed);
}
uint32_t bbq_dequeue_burst_elem(struct bbq *q, void *obj_table, uint32_t n, uint32_t *wait_consumed) {
+ bbq_errno = BBQ_OK;
return bbq_dequeue_burst_one_dimensional(q, obj_table, n, wait_consumed);
}
@@ -1014,11 +1063,11 @@ void bbq_debug_block_print(struct bbq *q, struct bbq_block *block) {
bbq_cursor reserved = atomic_load(&block->reserved);
bbq_cursor consumed = atomic_load(&block->consumed);
printf(" allocated:%lu committed:%lu reserved:%lu",
- bbq_off(q, allocated), bbq_off(q, committed), bbq_off(q, reserved));
+ bbq_cur_off(q, allocated), bbq_cur_off(q, committed), bbq_cur_off(q, reserved));
if (BBQ_F_CHK_DROP_OLD(q->flags)) {
printf("\n");
} else {
- printf(" consumed:%lu\n", bbq_off(q, consumed));
+ printf(" consumed:%lu\n", bbq_cur_off(q, consumed));
}
}
@@ -1027,17 +1076,17 @@ void bbq_debug_struct_print(struct bbq *q) {
uint64_t phead = atomic_load(&q->phead);
uint64_t chead = atomic_load(&q->chead);
- printf("block number:%lu block size:%lu total entries:%lu\n", q->bn, q->bs, q->bn * q->bs);
- printf("producer header idx:%lu vsn:%lu\n", bbq_idx(q, phead), bbq_head_vsn(q, phead));
+ printf("block number:%u block size:%u total entries:%u\n", q->bn, q->bs, q->bn * q->bs);
+ printf("producer header idx:%lu vsn:%lu\n", bbq_head_idx(q, phead), bbq_head_vsn(q, phead));
- uint64_t ph_idx = bbq_idx(q, phead);
- uint64_t ch_idx = bbq_idx(q, chead);
+ uint64_t ph_idx = bbq_head_idx(q, phead);
+ uint64_t ch_idx = bbq_head_idx(q, chead);
if (ph_idx != ch_idx) {
printf("block[%lu]\n", ph_idx);
bbq_debug_block_print(q, &(q->blocks[ph_idx]));
}
- printf("consumer header idx:%lu vsn:%lu\n", bbq_idx(q, chead), bbq_head_vsn(q, chead));
+ printf("consumer header idx:%lu vsn:%lu\n", bbq_head_idx(q, chead), bbq_head_vsn(q, chead));
printf("block[%lu]\n", ch_idx);
bbq_debug_block_print(q, &(q->blocks[ch_idx]));
} \ No newline at end of file
diff --git a/bbq/tests/common/test_mix.c b/bbq/tests/common/test_mix.c
index bcadaa5..28468f1 100644
--- a/bbq/tests/common/test_mix.c
+++ b/bbq/tests/common/test_mix.c
@@ -162,7 +162,7 @@ int test_setaffinity(int core_id) {
if (pthread_setaffinity_np(pthread_self(), sizeof(mask), &mask) == -1) {
fprintf(stderr, "pthread_setaffinity_np erro\n");
- return BBQ_ERROR;
+ return BBQ_ERR;
}
return BBQ_OK;
diff --git a/bbq/tests/common/test_queue.c b/bbq/tests/common/test_queue.c
index f4726d9..0934285 100644
--- a/bbq/tests/common/test_queue.c
+++ b/bbq/tests/common/test_queue.c
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-24 10:16:28
+ * @LastEditTime: 2024-06-27 02:50:17
* @Describe: TODO
*/
@@ -11,8 +11,8 @@
#include <sys/prctl.h>
#include <unistd.h>
extern bool bbq_debug_check_array_bounds(struct bbq *q);
-extern struct bbq *bbq_create_bnbs_elem(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags);
-extern struct bbq *bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs, int socket_id, uint32_t flags);
+extern struct bbq *bbq_create_elem_with_bnbs(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags);
+extern struct bbq *bbq_create_with_bnbs(const char *name, uint32_t bn, uint32_t bs, int socket_id, uint32_t flags);
uint32_t test_bbq_enqueue_burst(void *ring, void **obj_table, uint32_t n, uint16_t thread_idx, uint32_t *wait_consumed) {
TEST_AVOID_WARNING(thread_idx);
@@ -23,14 +23,14 @@ int test_queue_init_bbq(test_cfg *cfg, test_queue_s *q) {
if (cfg->ring.block_count == 0) {
q->ring = bbq_create("test_bbq", cfg->ring.entries_cnt, BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW);
} else {
- q->ring = bbq_create_bnbs("test_bbq", cfg->ring.block_count,
- cfg->ring.entries_cnt / cfg->ring.block_count,
- BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW);
+ q->ring = bbq_create_with_bnbs("test_bbq", cfg->ring.block_count,
+ cfg->ring.entries_cnt / cfg->ring.block_count,
+ BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW);
}
if (q->ring == NULL) {
TEST_ERR_LOG("bbq create queue failed");
- return BBQ_NULL_PTR;
+ return BBQ_ERR_INPUT_NULL;
}
q->ring_free_f = (test_ring_free_f)bbq_destory;
@@ -104,10 +104,17 @@ void test_data_destory(test_data **data, size_t cnt) {
test_free(TEST_MODULE_DATA, data);
}
-uint32_t test_exec_enqueue(test_queue_s *q, test_data **data, size_t burst_cnt, test_time_metric *op_use_diff, uint16_t thread_idx) {
+uint32_t test_exec_enqueue(test_queue_s *q, test_data **data, size_t burst_cnt,
+ test_time_metric *op_use_diff, uint16_t thread_idx) {
uint32_t enqueue_cnt = 0;
test_time_metric op_use_start = test_clock_time_get();
+#if 0
+ // wait_consumed会导致bbq损失部分性能
+ uint32_t wait_consumed = 0;
+ enqueue_cnt = q->enqueue_burst_f(q->ring, (void **)data, burst_cnt, thread_idx, &wait_consumed);
+#else
enqueue_cnt = q->enqueue_burst_f(q->ring, (void **)data, burst_cnt, thread_idx, NULL);
+#endif
*op_use_diff = test_clock_time_sub(test_clock_time_get(), op_use_start);
return enqueue_cnt;
@@ -138,7 +145,6 @@ void *test_thread_producer_start(void *arg) {
uint64_t op_err_latency_ns = 0;
uint64_t run_ok_times = cfg->run.run_ok_times / cfg->ring.producer_cnt;
test_time_metric op_latency = {0};
-
snprintf(thread_name, sizeof(thread_name), "producer:%lu", exit_data->thread_id);
prctl(PR_SET_NAME, thread_name);
if (test_setaffinity(t_arg->core) != BBQ_OK) {
@@ -222,13 +228,10 @@ void *test_thread_consumer_start(void *arg) {
exit_data->metric_start = test_clock_time_get();
- uint32_t last_times = cfg->ring.entries_cnt;
while (true) {
- if (!test_info->ctl.running || test_all_producer_exit(test_info)) {
+ if (test_all_producer_exit(test_info) && deq_cnt == 0) {
// 运行时间到了或是所有生产者退出了,检查生产者是否全部退出,且队列被消费完了
- if (deq_cnt == 0 && (last_times-- <= 0)) {
- break;
- }
+ break;
}
deq_cnt = test_exec_dequeue(q, deq_data, cfg->ring.burst_cnt, &op_latency);
diff --git a/bbq/tests/common/test_queue.h b/bbq/tests/common/test_queue.h
index 8d40e92..9edefdb 100644
--- a/bbq/tests/common/test_queue.h
+++ b/bbq/tests/common/test_queue.h
@@ -91,8 +91,8 @@ extern void test_wait_all_threads_ready(test_ctl *ctl);
extern void test_queue_destory(test_queue_s *q);
extern int test_queue_init_bbq(test_cfg *cfg, test_queue_s *q);
extern void test_merge_all_data(test_exit_data **exit_data, uint32_t thread_cnt, test_merge_s *merge);
-extern uint64_t bbq_idx(struct bbq *q, uint64_t x);
-extern uint64_t bbq_off(struct bbq *q, uint64_t x);
+extern uint64_t bbq_head_idx(struct bbq *q, uint64_t x);
+extern uint64_t bbq_cur_off(struct bbq *q, uint64_t x);
extern uint64_t bbq_head_vsn(struct bbq *q, uint64_t x);
extern uint64_t bbq_cur_vsn(struct bbq *q, uint64_t x);
extern test_data **test_data_create(size_t cnt);
diff --git a/bbq/tests/unittest/ut_example.cc b/bbq/tests/unittest/ut_example.cc
index 64ee3f6..0439612 100644
--- a/bbq/tests/unittest/ut_example.cc
+++ b/bbq/tests/unittest/ut_example.cc
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-24 10:13:21
+ * @LastEditTime: 2024-06-25 11:40:27
* @Describe: 简单的测试用例,测试基本功能
*/
@@ -11,7 +11,6 @@ extern "C" {
#include "test_queue.h"
#include "ut.h"
extern bool bbq_malloc_free_equal();
-extern void bbq_debug_memory_print();
extern bool bbq_debug_check_array_bounds(struct bbq *q);
extern void bbq_struct_print(struct bbq *q);
extern uint32_t bbq_enqueue_burst_elem_two_dimensional(struct bbq *q, void *const *obj_table, uint32_t n, uint32_t *wait_consumed);
@@ -57,7 +56,7 @@ TEST_F(bbq_example, single_retry_new_cp_ptr) {
ASSERT_NE(q, nullptr);
// 空队出队失败
- EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_QUEUE_EMPTY);
+ EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_ERR_EMPTY);
// 全部入队成功
for (uint32_t i = 0; i < 4000; i++) {
@@ -88,7 +87,8 @@ TEST_F(bbq_example, single_retry_new_cp_ptr) {
EXPECT_EQ(cnt, BUF_CNT);
// 空队出队失败
- EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_QUEUE_EMPTY);
+ EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_ERR_EMPTY);
+ bbq_destory(q);
}
TEST_F(bbq_example, single_retry_new_cp_value) {
@@ -101,7 +101,7 @@ TEST_F(bbq_example, single_retry_new_cp_value) {
ASSERT_NE(q, nullptr);
// 空队出队失败
- EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_QUEUE_EMPTY);
+ EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_ERR_EMPTY);
// 全部入队成功
for (uint32_t i = 0; i < 4000; i++) {
@@ -132,7 +132,8 @@ TEST_F(bbq_example, single_retry_new_cp_value) {
EXPECT_EQ(cnt, BUF_CNT);
// 空队出队失败
- EXPECT_EQ(bbq_dequeue_elem(q, &deq_data), BBQ_QUEUE_EMPTY);
+ EXPECT_EQ(bbq_dequeue_elem(q, &deq_data), BBQ_ERR_EMPTY);
+ bbq_destory(q);
}
TEST_F(bbq_example, single_drop_old_cp_pointer) {
@@ -148,7 +149,7 @@ TEST_F(bbq_example, single_drop_old_cp_pointer) {
EXPECT_LT(second_cnt, q->bs * q->bn);
// 空队出队失败
- EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_QUEUE_EMPTY);
+ EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_ERR_EMPTY);
// 全部入队成功,入队个数是BUF_CNT的整数倍,因此到了一个边界,刚好与消费者位置一致(套了loop圈)
uint32_t loop = 3;
@@ -184,7 +185,9 @@ TEST_F(bbq_example, single_drop_old_cp_pointer) {
EXPECT_EQ(cnt, second_cnt - q->bs);
// 空队出队失败
- EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_QUEUE_EMPTY);
+ EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_ERR_EMPTY);
+
+ bbq_destory(q);
}
TEST_F(bbq_example, single_drop_old_cp_value) {
@@ -200,7 +203,7 @@ TEST_F(bbq_example, single_drop_old_cp_value) {
EXPECT_LT(second_cnt, q->bs * q->bn);
// 空队出队失败
- EXPECT_EQ(bbq_dequeue_elem(q, &deq_data), BBQ_QUEUE_EMPTY);
+ EXPECT_EQ(bbq_dequeue_elem(q, &deq_data), BBQ_ERR_EMPTY);
// 全部入队成功,入队个数是BUF_CNT的整数倍,因此到了一个边界,刚好与消费者位置一致(套了loop圈)
uint32_t loop = 3;
@@ -234,9 +237,10 @@ TEST_F(bbq_example, single_drop_old_cp_value) {
// 一旦生产者追上了消费者,之前未消费的,以及当前块的数据全都作废了。
EXPECT_EQ(cnt, second_cnt - q->bs);
-
// 空队出队失败
- EXPECT_EQ(bbq_dequeue_elem(q, &deq_data), BBQ_QUEUE_EMPTY);
+ EXPECT_EQ(bbq_dequeue_elem(q, &deq_data), BBQ_ERR_EMPTY);
+
+ bbq_destory(q);
}
TEST_F(bbq_example, burst_retry_new_cp_value) {
@@ -285,8 +289,8 @@ TEST_F(bbq_example, burst_retry_new_cp_value) {
}
EXPECT_TRUE(bbq_debug_check_array_bounds(q));
- bbq_destory(q);
test_free(TEST_MODULE_DATA, deq_table2);
+ bbq_destory(q);
}
TEST_F(bbq_example, burst_retry_new_cp_pointer) {
@@ -334,8 +338,8 @@ TEST_F(bbq_example, burst_retry_new_cp_pointer) {
}
EXPECT_TRUE(bbq_debug_check_array_bounds(q));
- bbq_destory(q);
test_free(TEST_MODULE_DATA, deq_table2);
+ bbq_destory(q);
}
TEST_F(bbq_example, burst_drop_old_cp_pointer) {
@@ -384,8 +388,8 @@ TEST_F(bbq_example, burst_drop_old_cp_pointer) {
}
EXPECT_TRUE(bbq_debug_check_array_bounds(q));
- bbq_destory(q);
test_free(TEST_MODULE_DATA, deq_table2);
+ bbq_destory(q);
}
TEST_F(bbq_example, burst_drop_old_cp_value) {
diff --git a/bbq/tests/unittest/ut_head_cursor.cc b/bbq/tests/unittest/ut_head_cursor.cc
index b8bff37..e3b3b50 100644
--- a/bbq/tests/unittest/ut_head_cursor.cc
+++ b/bbq/tests/unittest/ut_head_cursor.cc
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-24 10:15:57
+ * @LastEditTime: 2024-06-25 11:42:49
* @Describe: TODO
*/
@@ -9,9 +9,8 @@ extern "C" {
#include "test_queue.h"
#include "ut.h"
extern bool bbq_malloc_free_equal();
-extern void bbq_debug_memory_print();
extern bool bbq_debug_check_array_bounds(struct bbq *q);
-extern struct bbq *bbq_create_bnbs_elem(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags);
+extern struct bbq *bbq_create_elem_with_bnbs(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags);
}
class bbq_head_cursor : public testing::Test { // 继承了 testing::Test
@@ -31,32 +30,32 @@ class bbq_head_cursor : public testing::Test { // 继承了 testing::Test
};
void expect_phead(struct bbq *q, uint64_t idx, uint64_t vsn, int line) {
- EXPECT_EQ(bbq_idx(q, q->phead), idx) << "line: " << line;
+ EXPECT_EQ(bbq_head_idx(q, q->phead), idx) << "line: " << line;
EXPECT_EQ(bbq_head_vsn(q, q->phead), vsn) << "line: " << line;
}
void expect_chead(struct bbq *q, uint64_t idx, uint64_t vsn, int line) {
- EXPECT_EQ(bbq_idx(q, q->chead), idx) << "line: " << line;
+ EXPECT_EQ(bbq_head_idx(q, q->chead), idx) << "line: " << line;
EXPECT_EQ(bbq_head_vsn(q, q->chead), vsn) << "line: " << line;
}
void expect_eq_allocated(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) {
- EXPECT_EQ(bbq_off(q, block->allocated), off) << "line: " << line;
+ EXPECT_EQ(bbq_cur_off(q, block->allocated), off) << "line: " << line;
EXPECT_EQ(bbq_cur_vsn(q, block->allocated), vsn) << "line: " << line;
}
void expect_eq_committed(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) {
- EXPECT_EQ(bbq_off(q, block->committed), off) << "line: " << line;
+ EXPECT_EQ(bbq_cur_off(q, block->committed), off) << "line: " << line;
EXPECT_EQ(bbq_cur_vsn(q, block->committed), vsn) << "line: " << line;
}
void expect_eq_consumed(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) {
- EXPECT_EQ(bbq_off(q, block->consumed), off) << "line: " << line;
+ EXPECT_EQ(bbq_cur_off(q, block->consumed), off) << "line: " << line;
EXPECT_EQ(bbq_cur_vsn(q, block->consumed), vsn) << "line: " << line;
}
void expect_eq_reserved(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) {
- EXPECT_EQ(bbq_off(q, block->reserved), off) << "line: " << line;
+ EXPECT_EQ(bbq_cur_off(q, block->reserved), off) << "line: " << line;
EXPECT_EQ(bbq_cur_vsn(q, block->reserved), vsn) << "line: " << line;
}
@@ -65,7 +64,7 @@ TEST_F(bbq_head_cursor, init) {
struct bbq *q;
uint32_t bn = 2;
uint32_t bs = 4;
- q = bbq_create_bnbs_elem("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW);
+ q = bbq_create_elem_with_bnbs("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW);
ASSERT_NE(q, nullptr);
// 1.初始化状态,除了第一个block外其他块的4个游标都指向最后一个条目
@@ -84,8 +83,6 @@ TEST_F(bbq_head_cursor, init) {
}
EXPECT_TRUE(bbq_debug_check_array_bounds(q));
bbq_destory(q);
- EXPECT_TRUE(bbq_malloc_free_equal());
- EXPECT_TRUE(test_malloc_free_equal());
}
void ut_produce_something(uint32_t produce_cnt) {
@@ -99,7 +96,7 @@ void ut_produce_something(uint32_t produce_cnt) {
EXPECT_GT(produce_cnt, 0);
EXPECT_LE(produce_cnt, bs);
- q = bbq_create_bnbs_elem("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW);
+ q = bbq_create_elem_with_bnbs("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW);
ASSERT_NE(q, nullptr);
// 生产produce_cnt
@@ -144,8 +141,6 @@ TEST_F(bbq_head_cursor, produce_something) {
ut_produce_something(567);
ut_produce_something(789);
ut_produce_something(4096);
- EXPECT_TRUE(bbq_malloc_free_equal());
- EXPECT_TRUE(test_malloc_free_equal());
}
void ut_produce_next_block(uint32_t over) {
@@ -160,7 +155,7 @@ void ut_produce_next_block(uint32_t over) {
EXPECT_GT(over, 0);
EXPECT_LT(over, bs);
- q = bbq_create_bnbs_elem("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW);
+ q = bbq_create_elem_with_bnbs("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW);
ASSERT_NE(q, nullptr);
// 生产至第二块的第一个entry
@@ -210,8 +205,6 @@ TEST_F(bbq_head_cursor, produce_next_block) {
ut_produce_next_block(123);
ut_produce_next_block(456);
ut_produce_next_block(4095);
- EXPECT_TRUE(bbq_malloc_free_equal());
- EXPECT_TRUE(test_malloc_free_equal());
}
void ut_produce_all_loop(uint32_t loop) {
@@ -223,7 +216,7 @@ void ut_produce_all_loop(uint32_t loop) {
int enqueue_data = TEST_DATA_MAGIC;
int dequeue_data = 0;
- q = bbq_create_bnbs_elem("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW);
+ q = bbq_create_elem_with_bnbs("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW);
ASSERT_NE(q, nullptr);
for (uint32_t cnt = 0; cnt < loop; cnt++) {
@@ -266,8 +259,6 @@ TEST_F(bbq_head_cursor, produce_all_loop) {
ut_produce_all_loop(10);
ut_produce_all_loop(23);
ut_produce_all_loop(79);
- EXPECT_TRUE(bbq_malloc_free_equal());
- EXPECT_TRUE(test_malloc_free_equal());
}
TEST_F(bbq_head_cursor, retry_new_full_empty) {
@@ -296,7 +287,7 @@ TEST_F(bbq_head_cursor, retry_new_full_empty) {
// 满队再入队
for (uint32_t j = 0; j < entries_cnt / 3; j++) {
ret = bbq_enqueue_elem(q, &data[j]);
- EXPECT_TRUE(ret == BBQ_QUEUE_FULL);
+ EXPECT_TRUE(ret == BBQ_ERR_FULL);
}
if (i == 0) {
@@ -321,7 +312,7 @@ TEST_F(bbq_head_cursor, retry_new_full_empty) {
// 空出队再出队
for (uint32_t j = 0; j < entries_cnt / 2; j++) {
ret = bbq_dequeue_elem(q, &tmp_data);
- EXPECT_TRUE(ret == BBQ_QUEUE_EMPTY);
+ EXPECT_TRUE(ret == BBQ_ERR_EMPTY);
}
EXPECT_EQ(q->phead.load() & q->idx_mask, q->chead.load() & q->idx_mask);
@@ -336,8 +327,6 @@ TEST_F(bbq_head_cursor, retry_new_full_empty) {
test_free(TEST_MODULE_UTEST, data);
EXPECT_TRUE(bbq_debug_check_array_bounds(q));
bbq_destory(q);
- EXPECT_TRUE(bbq_malloc_free_equal());
- EXPECT_TRUE(test_malloc_free_equal());
}
TEST_F(bbq_head_cursor, mpsc_faa) {
@@ -402,8 +391,6 @@ TEST_F(bbq_head_cursor, mpsc_faa) {
test_threads_destory(&test_info, threads);
EXPECT_TRUE(bbq_debug_check_array_bounds((struct bbq *)q.ring));
test_queue_destory(&q);
- EXPECT_TRUE(bbq_malloc_free_equal());
- EXPECT_TRUE(test_malloc_free_equal());
}
TEST_F(bbq_head_cursor, drop_old_full_empty) {
@@ -414,7 +401,7 @@ TEST_F(bbq_head_cursor, drop_old_full_empty) {
struct bbq *q;
int tmp_data = 0;
- q = bbq_create_bnbs_elem("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD);
+ q = bbq_create_elem_with_bnbs("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD);
ASSERT_NE(q, nullptr);
EXPECT_TRUE(bbq_empty(q));
@@ -438,7 +425,7 @@ TEST_F(bbq_head_cursor, drop_old_full_empty) {
// 空队再出队,失败
for (uint32_t i = 0; i < bn * bs; i++) {
ret = bbq_dequeue_elem(q, &tmp_data);
- EXPECT_TRUE(ret == BBQ_QUEUE_EMPTY) << "ret " << ret;
+ EXPECT_TRUE(ret == BBQ_ERR_EMPTY) << "ret " << ret;
}
expect_phead(q, bn - 1, j, __LINE__);
@@ -452,9 +439,6 @@ TEST_F(bbq_head_cursor, drop_old_full_empty) {
}
EXPECT_TRUE(bbq_debug_check_array_bounds(q));
bbq_destory(q);
-
- EXPECT_TRUE(bbq_malloc_free_equal());
- EXPECT_TRUE(test_malloc_free_equal());
}
TEST_F(bbq_head_cursor, drop_old_full_empty_cover) {
@@ -468,7 +452,7 @@ TEST_F(bbq_head_cursor, drop_old_full_empty_cover) {
EXPECT_EQ(over_cnt / bs, 1);
int tmp_data = 0;
- q = bbq_create_bnbs_elem("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD);
+ q = bbq_create_elem_with_bnbs("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD);
ASSERT_NE(q, nullptr);
EXPECT_TRUE(bbq_empty(q));
@@ -514,7 +498,7 @@ TEST_F(bbq_head_cursor, drop_old_full_empty_cover) {
for (uint32_t i = 0; i < bn * bs; i++) {
EXPECT_TRUE(bbq_empty(q));
ret = bbq_dequeue_elem(q, &tmp_data);
- EXPECT_TRUE(ret == BBQ_QUEUE_EMPTY) << "ret " << ret;
+ EXPECT_TRUE(ret == BBQ_ERR_EMPTY) << "ret " << ret;
}
expect_chead(q, 1, 0, __LINE__);
@@ -535,6 +519,4 @@ TEST_F(bbq_head_cursor, drop_old_full_empty_cover) {
EXPECT_TRUE(bbq_debug_check_array_bounds(q));
bbq_destory(q);
- EXPECT_TRUE(bbq_malloc_free_equal());
- EXPECT_TRUE(test_malloc_free_equal());
} \ No newline at end of file
diff --git a/bbq/tests/unittest/ut_mix.cc b/bbq/tests/unittest/ut_mix.cc
index daf6c97..fa2f07f 100644
--- a/bbq/tests/unittest/ut_mix.cc
+++ b/bbq/tests/unittest/ut_mix.cc
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-19 22:49:24
+ * @LastEditTime: 2024-06-26 03:37:56
* @Describe: bbq除了队列操作外,其他函数的测试
*/
@@ -10,11 +10,11 @@ extern "C" {
#include "ut.h"
#include <math.h>
extern bool bbq_check_power_of_two(int n);
-extern uint32_t bbq_blocks_calc(uint32_t entries);
-extern unsigned ceil_log2(uint64_t x);
-extern uint64_t fetch_max(aotmic_uint64 *atom, uint64_t upd);
+extern unsigned bbq_ceil_log2(uint64_t x);
+extern uint64_t bbq_fetch_max(aotmic_uint64 *atom, uint64_t upd);
extern bool bbq_malloc_free_equal();
extern bool test_malloc_free_equal();
+extern int bbq_bnbs_calc(uint32_t entries, uint32_t *bn, uint32_t *bs);
}
class bbq_mix : public testing::Test { // 继承了 testing::Test
@@ -48,17 +48,17 @@ void *fetch_max_thread_func(void *arg) {
uint64_t *ret = (uint64_t *)test_malloc(TEST_MODULE_UTEST, sizeof(*ret));
// 不同线程写入不同的>3的数
- *ret = fetch_max(&fetch_arg->data, pthread_self() + 3);
+ *ret = bbq_fetch_max(&fetch_arg->data, pthread_self() + 3);
pthread_exit(ret);
}
-TEST_F(bbq_mix, fetch_max) {
+TEST_F(bbq_mix, bbq_fetch_max) {
uint64_t ret = 0;
ut_fetch_arg arg = {};
arg.data.store(1); // 初始化1
arg.thread_cnt = 50;
- ret = fetch_max(&arg.data, 2); // max比较后设置为2
+ ret = bbq_fetch_max(&arg.data, 2); // max比较后设置为2
EXPECT_EQ(arg.data.load(), 2);
EXPECT_EQ(ret, 1);
@@ -80,8 +80,6 @@ TEST_F(bbq_mix, fetch_max) {
// EXPECT_EQ(eq_cnt, 1);
test_free(TEST_MODULE_UTEST, threads);
- EXPECT_TRUE(bbq_malloc_free_equal());
- EXPECT_TRUE(test_malloc_free_equal());
}
TEST_F(bbq_mix, power_of_two) {
@@ -105,30 +103,34 @@ TEST_F(bbq_mix, power_of_two) {
break;
}
}
- EXPECT_TRUE(bbq_malloc_free_equal());
- EXPECT_TRUE(test_malloc_free_equal());
}
-TEST_F(bbq_mix, bbq_blocks_calc) {
- uint32_t tmp = 0;
+TEST_F(bbq_mix, bbq_block_number_calc) {
+ uint32_t tmp = 2;
uint32_t max = pow(2, 32) - 1;
+ uint32_t bn = 0, bs = 0;
+ int ret = 0;
- tmp = 2;
- for (uint32_t val = 1; val < max; val *= tmp) {
+ ret = bbq_bnbs_calc(1, &bn, &bs);
+ EXPECT_EQ(ret, BBQ_ERR_OUT_OF_RANGE);
+
+ for (uint32_t val = 2; val < max; val *= tmp) {
+ ret = bbq_bnbs_calc(val, &bn, &bs);
+ EXPECT_EQ(ret, BBQ_OK);
if (val <= 128) {
- EXPECT_TRUE(bbq_blocks_calc(val) == 2);
+ EXPECT_EQ(bn, 2);
} else if (val <= 2048) {
- EXPECT_TRUE(bbq_blocks_calc(val) == 4);
+ EXPECT_EQ(bn, 4);
} else if (val <= 32768) {
- EXPECT_TRUE(bbq_blocks_calc(val) == 8);
+ EXPECT_EQ(bn, 8);
} else if (val <= 524288) {
- EXPECT_TRUE(bbq_blocks_calc(val) == 16);
+ EXPECT_EQ(bn, 16);
} else if (val <= 8388608) {
- EXPECT_TRUE(bbq_blocks_calc(val) == 32);
+ EXPECT_EQ(bn, 32);
} else if (val <= 134217728) {
- EXPECT_TRUE(bbq_blocks_calc(val) == 64);
+ EXPECT_EQ(bn, 64);
} else if (val <= 2147483648) {
- EXPECT_TRUE(bbq_blocks_calc(val) == 128);
+ EXPECT_EQ(bn, 128);
} else {
EXPECT_TRUE(0); // 异常
}
@@ -137,22 +139,4 @@ TEST_F(bbq_mix, bbq_blocks_calc) {
break;
}
}
- EXPECT_TRUE(bbq_malloc_free_equal());
- EXPECT_TRUE(test_malloc_free_equal());
-}
-
-TEST_F(bbq_mix, ceil_log2) {
- uint32_t tmp = 0;
- uint32_t max = pow(2, 32) - 1;
-
- tmp = 2;
- for (uint32_t val = 1, bbq_idx = 0; val < max; val *= tmp, bbq_idx++) {
- EXPECT_TRUE(bbq_idx == ceil_log2(val));
- if (val >= max / tmp) {
- break;
- }
- EXPECT_TRUE(log2(val) == ceil_log2(val));
- }
- EXPECT_TRUE(bbq_malloc_free_equal());
- EXPECT_TRUE(test_malloc_free_equal());
}
diff --git a/bbq/tests/unittest/ut_multit.cc b/bbq/tests/unittest/ut_multit.cc
index 0d29b1c..bbe963b 100644
--- a/bbq/tests/unittest/ut_multit.cc
+++ b/bbq/tests/unittest/ut_multit.cc
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-17 11:07:02
+ * @LastEditTime: 2024-06-25 11:30:59
* @Describe: TODO
*/
@@ -12,7 +12,6 @@ extern "C" {
#include "ut.h"
extern bool bbq_malloc_free_equal();
extern bool test_malloc_free_equal();
-extern void bbq_debug_memory_print();
bool bbq_debug_check_array_bounds(struct bbq *q);
}
diff --git a/perf/benchmark/bcm_benchmark.c b/perf/benchmark/bcm_benchmark.c
index 26d75de..801cd3a 100644
--- a/perf/benchmark/bcm_benchmark.c
+++ b/perf/benchmark/bcm_benchmark.c
@@ -39,12 +39,12 @@ void bcm_report_printf(test_cfg *cfg, test_merge_data *data, test_exit_data **ra
// 多生产者单消费者 或 单生产者多消费才输出
if ((cfg->ring.producer_cnt == 1 && cfg->ring.consumer_cnt > 1) ||
(cfg->ring.producer_cnt > 1 && cfg->ring.consumer_cnt == 1)) {
- for (uint32_t i = 0, bbq_idx = 1; i < thread_cnt; i++) {
+ for (uint32_t i = 0, bbq_head_idx = 1; i < thread_cnt; i++) {
if (raw_data[i]->arg->ttype == ttype) {
test_time_metric tmp_time = test_clock_time_sub(raw_data[i]->metric_end, raw_data[i]->metric_start);
throughput = raw_data[i]->ok_cnt / test_clock_time_to_double(&tmp_time);
- printf(" %s-%d 吞吐 :%.0lf/s (%e/s)", name, bbq_idx, throughput, throughput);
- bbq_idx++;
+ printf(" %s-%d 吞吐 :%.0lf/s (%e/s)", name, bbq_head_idx, throughput, throughput);
+ bbq_head_idx++;
}
}
}
diff --git a/perf/benchmark/bcm_queue.c b/perf/benchmark/bcm_queue.c
index f6bce9d..5cbcea7 100644
--- a/perf/benchmark/bcm_queue.c
+++ b/perf/benchmark/bcm_queue.c
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-21 18:07:20
+ * @LastEditTime: 2024-06-25 14:14:40
* @Describe: TODO
*/
@@ -42,7 +42,7 @@ int test_queue_init_dpdk(test_cfg *cfg, test_queue_s *q) {
q->ring = (void *)rte_ring_create("dpdk_ring", cfg->ring.entries_cnt, rte_socket_id(), flags);
if (q->ring == NULL) {
- return BBQ_NULL_PTR;
+ return BBQ_ERR_INPUT_NULL;
}
q->ring_free_f = (test_ring_free_f)rte_ring_free;
@@ -159,7 +159,7 @@ int test_queue_init_rmind(test_cfg *cfg, test_queue_s *q) {
int bcm_queue_init(test_cfg *cfg, test_queue_s *q) {
if (cfg == NULL || q == NULL) {
- return BBQ_NULL_PTR;
+ return BBQ_ERR_INPUT_NULL;
}
memset(q, 0, sizeof(*q));
@@ -176,7 +176,7 @@ int bcm_queue_init(test_cfg *cfg, test_queue_s *q) {
ret = test_queue_init_rmind(cfg, q);
break;
default:
- return BBQ_UNKNOWN_TYPE;
+ return BBQ_ERR;
}
return ret;