summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author刘煜 <[email protected]>2024-06-24 02:22:36 +0000
committer刘煜 <[email protected]>2024-06-24 02:22:36 +0000
commitcbaa973c4acad87f03d128bdbfe094f2e578f1f7 (patch)
treeb98a244b2face4aae9860a560ea8422721c6b857
parente26e626708bdf2e783545eddf9f93589367f264b (diff)
parente5ddd34ebba3a789fd4044085622710c30842c82 (diff)
Merge branch 'dev-if' into 'dev'
调整结构体和完善api接口 See merge request liuyu/bbq!5
-rw-r--r--bbq/include/bbq.h265
-rw-r--r--bbq/src/bbq.c560
-rw-r--r--bbq/test.c54
-rw-r--r--bbq/tests/common/test_queue.c31
-rw-r--r--bbq/tests/common/test_queue.h14
-rw-r--r--bbq/tests/unittest/ut.h2
-rw-r--r--bbq/tests/unittest/ut_example.cc181
-rw-r--r--bbq/tests/unittest/ut_head_cursor.cc162
-rw-r--r--bbq/tests/unittest/ut_mix.cc10
-rw-r--r--bbq/tests/unittest/ut_multit.cc6
-rw-r--r--perf/CMakeLists.txt4
-rw-r--r--perf/benchmark/bcm_benchmark.c8
-rw-r--r--perf/benchmark/bcm_queue.c24
13 files changed, 720 insertions, 601 deletions
diff --git a/bbq/include/bbq.h b/bbq/include/bbq.h
index 0b7921c..56785fc 100644
--- a/bbq/include/bbq.h
+++ b/bbq/include/bbq.h
@@ -1,12 +1,11 @@
/*
* @Author: [email protected]
- * @LastEditTime: 2024-06-18 14:16:54
+ * @LastEditTime: 2024-06-24 10:17:39
* @Describe: bbq(Block-based Bounded Queue)头文件
* 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf
*/
-#ifndef _BBQ_H_
-#define _BBQ_H_
+#pragma once
#include <stdbool.h>
#include <stdint.h>
@@ -25,9 +24,11 @@ using bbq_head = std::atomic<uint64_t>;
using aotmic_uint64 = std::atomic<uint64_t>;
#endif
+#define BBQ_SOCKET_ID_ANY -1
#define __BBQ_CACHE_ALIGNED __attribute__((__aligned__(64)))
+#define BBQ_SYMBOL_MAX 64
-struct bbq_block_s {
+struct bbq_block {
bbq_cursor committed; // 已提交(version|offset)
bbq_cursor allocated; // 已分配(version|offset)
bbq_cursor reserved; // 已预留(version|offset)
@@ -35,165 +36,180 @@ struct bbq_block_s {
char *entries; // 存储大小可变的entry,分配空间大小:bs * entry_size
} __BBQ_CACHE_ALIGNED;
-struct bbq_s {
- size_t bs; // 每个block里entries成员的大小
- size_t bn; // 块(blocks)的个数
- size_t obj_size; // 入队数据的实际大小,int int* int**,都为sizeof(int)
- size_t entry_size; // blocks.entries里每个entry的大小
-
- int32_t socket_id; // 在哪个socket_id上使用libnuma分配内存,小于0将使用malloc分配
- uint32_t flags; // 标记:retry new 模式,还是drop old模式
- uint32_t idx_bits; // bbq_head里idx所占的位数
- uint32_t off_bits; // bbq_cursor里offset所占的位数
-
- uint64_t idx_mask; // idx_bits偏移后的掩码
- uint64_t off_mask; // off_bits偏移后的掩码
- bbq_head phead; // 生产者头,指向块的索引,分为两部分:version|idx
- bbq_head chead; // 消费者头,指向块的索引,分为两部分:version|idx
-
- struct bbq_block_s *blocks; // bn大小的数组
+struct bbq {
+ char name[BBQ_SYMBOL_MAX] __BBQ_CACHE_ALIGNED;
+ int32_t socket_id; // 用于libnuma分配内存,socket_id小于0将使用malloc分配
+ uint32_t bn; // blocks的个数
+ uint32_t bs; // blocks.entries的个数
+ uint32_t flags; // 标记:retry new 模式,还是drop old模式
+ uint32_t idx_bits; // bbq_head里idx所占的位数
+ uint32_t off_bits; // bbq_cursor里offset所占的位数
+ uint64_t idx_mask; // idx_bits偏移后的掩码
+ uint64_t off_mask; // off_bits偏移后的掩码
+ uint64_t entry_size; // blocks.entries里每个entry的大小
+ bbq_head phead; // 生产者头,指向块的索引,分为两部分:version|idx
+ bbq_head chead; // 消费者头,指向块的索引,分为两部分:version|idx
+ struct bbq_block *blocks; // bn大小的数组
};
-// 创建队列时flags可选参数
-// 第一位控制入队策略,默认是retry new
-#define BBQ_CREATE_F_DROP_OLD 0x0001 /**< 创建队列时设置为drop old模式(队列满时,覆盖旧数据,入队成功) */
-#define BBQ_CREATE_F_RETRY_NEW 0x0 /**< 默认为retry new模式(队列满时,当前入队失败) */
-
-// 第二位控制入队时的数据拷贝策略,默认是copy pointer
-#define BBQ_CREATE_F_COPY_VALUE 0x0002 /**< 创建队列时设置为拷贝数值 */
-#define BBQ_CREATE_F_COPY_PTR 0x0 /**< 默认为拷贝指针 */
+#define BBQ_F_DROP_OLD 0x0002 /**< 创建队列时设置为drop old模式(队列满时,入队成功并覆盖旧数据) */
+// #define BBQ_F_SP_ENQ 0x0004 /**< 创建队列时设置为单生产者 */
+// #define BBQ_F_SC_DEQ 0x0008 /**< 创建队列时设置为单消费者 */
+#define BBQ_F_DEFAULT 0x0
+#define BBQ_F_RETRY_NEW BBQ_F_DEFAULT /**< 创建队列时设置为retry new模式(队列满时,当前入队失败) */
+#define BBQ_F_MP_ENQ BBQ_F_DEFAULT /**< 创建队列时设置为多生产者 */
+#define BBQ_F_MC_DEQ BBQ_F_DEFAULT /**< 创建队列时设置为多消费者 */
/**
- * 创建并返回bbq队列结构指针
+ * 创建bbq队列,使用当前函数创建的队列,后续操作会把指针入队。
+ * 对应入队函数:bbq_enqueue、bbq_enqueue_burst
+ * 对应出队函数:bbq_dequeue、bbq_dequeue_burst
*
+ * @param[in] name
+ * bbq名称
* @param[in] count
- * 队列所有entry的个数(所有块下entry个数的总和),count必须大于1,且是2的N次方。
- * 函数将会根据count自动计算出合理的块个数,并将entry平均分配到每个块里。
- * 计算公式:log2(blocks) = max(1, ⌊log2(count)/4⌋) 注:⌊ ⌋代表向下取整。
- * @param[in] obj_size
- * 入队数据的实际大小,int int* int**,都为sizeof(int),因为在burst的时候会传入数组,
- * 需要根据数据大小来计算偏移,请正确设置该值。
- * @param[in] flags
- * 设置入队策略,通过BBQ_CREATE_F_XX系列宏定义设置flags
- * 1)第一位控制入队策略,默认是retry new模式(队列满了当前入队失败)。
- * 如果要设置为drop old模式,需要flags|BBQ_CREATE_F_DROP_OLD
- * 2)第二位控制数据入队列时,将"指针"入队,还是将"指针指向的数据"入队。默认入队"指针",
- * 如果把"指针指向的数据"入队,需要设置flags|BBQ_CREATE_F_COPY_VALUE
- * 3)可以同时设置多个flag,入BBQ_CREATE_F_DROP_OLD|BBQ_CREATE_F_COPY_VALUE
- * @return
- * 非NULL:消息队列结构体指针,用于后续出队、入队等操作。
- * NULL:创建失败。
- */
-extern struct bbq_s *bbq_create(uint32_t count, size_t obj_size, uint32_t flags);
-
-/**
- * 创建并返回bbq队列结构指针,支持多numa
- *
- * @param[in] count
- * 队列所有entry的个数(所有块下entry个数的总和),count必须大于1,且是2的N次方。
- * 函数将会根据count自动计算出合理的块个数,并将entry平均分配到每个块里。
- * 计算公式:log2(blocks) = max(1, ⌊log2(count)/4⌋) 注:⌊ ⌋代表向下取整。
- * @param[in] flags
- * 设置入队策略,通过BBQ_CREATE_F_XX系列宏定义设置flags
- * 1)第一位控制入队策略,默认是retry new模式(队列满了当前入队失败)。
- * 如果要设置为drop old模式,需要flags|BBQ_CREATE_F_DROP_OLD
- * 2)第二位控制数据入队列时,将"指针"入队,还是将"指针指向的数据"入队。默认入队"指针",
- * 如果把"指针指向的数据"入队,需要设置flags|BBQ_CREATE_F_COPY_VALUE
- * 3)可以同时设置多个flag,入BBQ_CREATE_F_DROP_OLD|BBQ_CREATE_F_COPY_VALUE
- * @param[in] obj_size
- * 入队数据的实际大小,int int* int**,都为sizeof(int),因为在burst的时候会传入数组,
- * 需要根据数据大小来计算偏移,请正确设置该值。
+ * 队列所有entry的个数,count必须大于1,且是2的N次方。
* @param[in] socket_id
- * 多numa架构下,队列里的空间将针对指定socket调用libnuma库函数分配内存。
+ * 多numa架构下,调用libnuma库函数针对指定socket分配内存。
* 当检测到不支持多numa,将转为malloc分配内存。
+ * @param[in] flags
+ * 设置入队策略:
+ * 1)BBQ_F_RETRY_NEW(默认):队列满了当前入队失败。
+ * 2)BBQ_F_DROP_OLD:队列满时,覆盖旧数据,入队成功
* @return
* 非NULL:消息队列结构体指针,用于后续出队入队等操作。
* NULL:创建失败。
*/
-extern struct bbq_s *bbq_create_with_socket(uint32_t count, size_t obj_size, int socket_id, uint32_t flags);
+extern struct bbq *bbq_create(const char *name, uint32_t count, int socket_id, uint32_t flags);
/**
- * 用于释放消息队列,与bbq_create/bbq_create_with_socket函数成对。
+ * 消息队列单个指针入队
*
* @param[in] q
* 队列指针
+ * @param[in] data
+ * 则传入一维指针,如:
+ * int *data = malloc(sizeof(int));*data = 1; 传入&data
+ * @return
+ * 成功返回0,失败返回小于0的错误码。
*/
-extern void bbq_ring_free(struct bbq_s *q);
+extern int bbq_enqueue(struct bbq *q, void *const *data);
/**
- * 消息队列单个数据入队
+ * 消息队列单个指针出队
*
* @param[in] q
* 队列指针
- * @param[in] data
- * 创建队列时:
- * 1)如果flag设置了BBQ_CREATE_F_COPY_VALUE,则传入一维指针,如 int data,如:
- * int data,传入&data
- * int *data = malloc(sizeof(int));*data = 1; 传入data
- * 2)如果flag设置了BBQ_CREATE_F_COPY_PTR,则传入二维指针,如 int *data,如:
- * int *data = malloc(sizeof(int));*data = 1; 传入&data
+ * @param[out] data
+ * 则传入二维指针,如:
+ * int *data = NULL; 传入&data
* @return
* 成功返回0,失败返回小于0的错误码。
*/
-extern int bbq_enqueue(struct bbq_s *q, void *data);
+extern int bbq_dequeue(struct bbq *q, void **data);
/**
- * 消息队列批量入队(指针入队),尽可能一次入队n个数据,返回实际成功入队个数
+ * 消息队列批量指针入队,尽可能一次入队n个指针,返回实际成功入队个数
*
* @param[in] q
* 队列指针
* @param[in] obj_table
- * 即将入队的指针数组,将数组里的每个成员(指针)入队
+ * 即将入队的指针数组,如:
+ * uint16_t **obj_table = malloc(sizeof(uint16_t **) * BUF_CNT);
+ * for(int i=0;i<BUF_CNT;i++){
+ * obj_table[i] = malloc(sizeof(uint16_t));
+ * obj_table[i] = 1;
+ * }
+ * 传入obj_table
* @param[in] n
* 尝试一次入队的个数
+ * @param[out] wait_consumed
+ * 如果为非NULL,返回当前队列中,已入队的个数。
* @return
- * 返回实际成功入队个数
+ * 返回实际成功入队的个数
*/
-extern uint32_t bbq_enqueue_burst_ptr(struct bbq_s *q, void *const *obj_table, uint32_t n);
+extern uint32_t bbq_enqueue_burst(struct bbq *q, void *const *obj_table, uint32_t n, uint32_t *wait_consumed);
/**
- * 消息队列批量入队(数据入队),尽可能一次入队n个数据,返回实际成功入队个数
+ * 消息队列批量出队(指针出队),尽可能一次出队n个数据,返回实际成功出队个数
*
* @param[in] q
* 队列指针
- * @param[in] obj_table
- * 即将入队的数组,将数组里的每个成员入队
+ * @param[out] obj_table
+ * 存储出队的指针,如:
+ * uint16_t **obj_table = malloc(sizeof(uint16_t *))
+ * 传入obj_table
* @param[in] n
- * 尝试一次入队的个数
+ * 尝试一次出队的个数
+ * @param[out] wait_consumed
+ * 如果为非NULL,返回当前队列中,已入队的个数。
* @return
- * 返回实际成功入队个数
+ * 返回实际成功出队的个数
*/
-extern uint32_t bbq_enqueue_burst_value(struct bbq_s *q, void const *obj_table, uint32_t n);
+extern uint32_t bbq_dequeue_burst(struct bbq *q, void **obj_table, uint32_t n, uint32_t *wait_consumed);
+
+/**
+ * 创建bbq队列,使用当前函数创建的队列,后续操作会把指针指向的数据拷贝入队。
+ * 对应入队函数:bbq_enqueue_elem、bbq_enqueue_burst_elem
+ * 对应出队函数:bbq_dequeue_elem、bbq_dequeue_burst_elem
+ *
+ * @param[in] name
+ * bbq名称
+ * @param[in] count
+ * 队列所有entry的个数,count必须大于1,且是2的N次方。
+ * @param[in] socket_id
+ * 多numa架构下,调用libnuma库函数针对指定socket分配内存。
+ * 当检测到不支持多numa,将转为malloc分配内存。
+ * @param[in] flags
+ * 设置入队策略:
+ * 1)BBQ_F_RETRY_NEW(默认):队列满了当前入队失败。
+ * 2)BBQ_F_DROP_OLD:队列满时,覆盖旧数据,入队成功
+ * @return
+ * 非NULL:消息队列结构体指针,用于后续出队入队等操作。
+ * NULL:创建失败。
+ */
+extern struct bbq *bbq_create_elem(const char *name, uint32_t count, size_t obj_size, int socket_id, uint32_t flags);
+
+/**
+ * 消息队列单个数据入队(指针指向的数据将被拷贝)
+ *
+ * @param[in] q
+ * 队列指针
+ * @param[in] data
+ * 传入一维指针,如:int data = 1; 传入&data
+ * @return
+ * 成功返回0,失败返回小于0的错误码。
+ */
+extern int bbq_enqueue_elem(struct bbq *q, void const *data);
/**
* 消息队列单个数据出队
+ *
* @param[in] q
- * 队列指针
+ * 队列指针
* @param[in] data
- * 创建队列时:
- * 1)如果flag设置了BBQ_CREATE_F_COPY_VALUE,则传入一维指针,如:
- * int data,传入&data
- * int *data = malloc(sizeof(int)),传入data
- * 2)如果flag设置了BBQ_CREATE_F_COPY_PTR,则传入二维指针,如:
- * int *data = NULL; 传入&data
+ * 则传入一维指针,如:int data; 传入&data
* @return
* 成功返回0,失败返回小于0的错误码。
*/
-extern int bbq_dequeue(struct bbq_s *q, void *data);
+extern int bbq_dequeue_elem(struct bbq *q, void *data);
/**
- * 消息队列批量出队(指针出队),尽可能一次出队n个数据,返回实际成功出队个数
+ * 消息队列批量入队(数据入队),尽可能一次入队n个数据,返回实际成功入队个数
*
* @param[in] q
* 队列指针
- * @param[out] obj_table
- * 存储出队的数据(指针)
+ * @param[in] obj_table
+ * 即将入队的数组,将数组里的每个成员入队,如:
+ * uint16_t obj_table[1024] = {初始化数据}; 传入obj_table
* @param[in] n
- * 尝试一次出队的个数
+ * 尝试一次入队的个数
+ * @param[out] wait_consumed
+ * 如果为非NULL,返回当前队列中,已入队的个数。
* @return
- * 返回实际成功出队个数
+ * 返回实际成功入队个数
*/
-extern uint32_t bbq_dequeue_burst_ptr(struct bbq_s *q, void **obj_table, uint32_t n);
+extern uint32_t bbq_enqueue_burst_elem(struct bbq *q, void const *obj_table, uint32_t n, uint32_t *wait_consumed);
/**
* 消息队列批量出队(数据出队),尽可能一次出队n个数据,返回实际成功出队个数
@@ -201,13 +217,43 @@ extern uint32_t bbq_dequeue_burst_ptr(struct bbq_s *q, void **obj_table, uint32_
* @param[in] q
* 队列指针
* @param[out] obj_table
- * 存储出队的数据
+ * 存储出队的数据,如:
+ * uint16_t obj_table[BUF_CNT] = {0}; 传入(void *)obj_table
* @param[in] n
* 尝试一次出队的个数
+ * @param[out] wait_consumed
+ * 如果为非NULL,返回当前队列中,已入队的个数。
* @return
* 返回实际成功出队个数
*/
-extern uint32_t bbq_dequeue_burst_value(struct bbq_s *q, void *obj_table, uint32_t n);
+extern uint32_t bbq_dequeue_burst_elem(struct bbq *q, void *obj_table, uint32_t n, uint32_t *wait_consumed);
+
+/**
+ * 用于释放消息队列。
+ *
+ * @param[in] q
+ * 队列指针
+ * @return
+ * - true: 队列为空返回
+ * - false: 队列非空
+ */
+bool bbq_empty(struct bbq *q);
+
+/**
+ * 用于释放消息队列。
+ *
+ * @param[in] q
+ * 队列指针
+ */
+extern void bbq_destory(struct bbq *q);
+
+/**
+ * 打印消息队列信息(调试用)。
+ *
+ * @param[in] q
+ * 队列指针
+ */
+extern void bbq_debug_struct_print(struct bbq *q);
// 通用返回码
#define BBQ_OK 0 // 成功
@@ -217,8 +263,7 @@ extern uint32_t bbq_dequeue_burst_value(struct bbq_s *q, void *obj_table, uint32
#define BBQ_UNKNOWN_TYPE -3 // 未知类型
// 队列错误
-#define BBQ_QUEUE_FULL -1001 // 队列已满(入队失败)
-#define BBQ_QUEUE_BUSY -1002 // 队列忙碌中(入队或出队失败)
-#define BBQ_QUEUE_EMPTY -1003 // 队列已空(出队失败)
-
-#endif \ No newline at end of file
+#define BBQ_QUEUE_FULL -1001 // 队列已满(入队失败)
+#define BBQ_QUEUE_BUSY -1002 // 队列忙碌中(入队或出队失败)
+#define BBQ_QUEUE_EMPTY -1003 // 队列已空(出队失败)
+#define BBQ_QUEUE_DATA_ERR -1004 // 传入的数据异常 \ No newline at end of file
diff --git a/bbq/src/bbq.c b/bbq/src/bbq.c
index cab4234..d5bdf03 100644
--- a/bbq/src/bbq.c
+++ b/bbq/src/bbq.c
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-18 15:03:00
+ * @LastEditTime: 2024-06-24 10:11:24
* @Describe: bbq(Block-based Bounded Queue)实现
* 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf
@@ -11,86 +11,82 @@
#include <stdio.h>
#include <string.h>
-// -----------------------------日志宏定义-------------------------------
+// flags第1位控制入队时的数据拷贝策略,默认是"拷贝指针"
+#define BBQ_F_COPY_PTR 0x0 /**< 默认为拷贝指针 */
+#define BBQ_F_COPY_VALUE 0x0001 /**< 创建队列时设置为拷贝数值 */
+
+// 判断flags标记位
+#define BBQ_F_CHK_DROP_OLD(flags) (flags & BBQ_F_DROP_OLD)
+#define BBQ_F_CHK_VALUE(flags) (flags & BBQ_F_COPY_VALUE)
+
+// 避免无用参数的编译告警
+#define AVOID_WARNING(param) ((void)param)
+
#define BBQ_ERR_LOG(fmt, ...) \
do { \
printf("\x1b[31m [ERR][%s:%d:%s]" fmt "\x1b[0m\n", __func__, __LINE__, __FILE__, ##__VA_ARGS__); \
} while (0)
-// -----------------------------其他宏定义-------------------------------
-#define AVOID_WARNING(param) ((void)param)
-#define BBQ_INVALID_SOCKET -1
-
-#define BBQ_POLIC_DROP_OLD(flags) (flags & BBQ_F_POLICY_DROP_OLD)
-#define BBQ_POLIC_RETRY_NEW(flags) (!(flags & BBQ_CREATE_F_DROP_OLD))
-
-#define BBQ_COPY_VALUE(flags) (flags & BBQ_F_COPY_VALUE)
-#define BBQ_COPY_POINTER(flags) (!(flags & BBQ_CREATE_F_COPY_VALUE))
-
-// -----------------------------用于内存测试-------------------------------
-// 当BBQ_MEMORY宏定义开关打开,将对内存分配释放进行统计,方便排查内存泄漏
-// #define BBQ_MEMORY
-enum bbq_module_e {
- BBQ_MODULE_QUEUE = 0,
- BBQ_MODULE_QUEUE_BLOCK_NB,
- BBQ_MODULE_QUEUE_BLOCK_ENTRY,
- BBQ_MODULE_MAX,
-};
-
struct bbq_status {
int32_t status; // 返回状态
uint32_t actual_burst; // 实际出/入队个数
};
-enum bbq_queue_state_e {
+enum bbq_queue_state {
BBQ_SUCCESS = 0,
- BBQ_BLOCK_DONE, // 当前块已的entry已用完,需要移动到下一个块
- BBQ_NO_ENTRY, // 没有条目可以使用
+ BBQ_BLOCK_DONE, // 当前块的entry已用完,需要移动到下一个块
+ BBQ_NO_ENTRY, // 队列里没有entry可以使用了
BBQ_NOT_AVAILABLE, // 当前块不可以用状态(将返回busy)
BBQ_ALLOCATED, // 已分配,返回entry信息
BBQ_RESERVED, // 已保留,返回entry信息
};
-struct bbq_entry_desc_s {
- uint64_t off; // entry在当前block的偏移(offset)
- uint64_t vsn; // allocated游标的版本(vsn)
- uint32_t actual_burst; // 实际出入队个数
- struct bbq_block_s *block; // 指向所在的block
+struct bbq_entry_desc {
+ uint64_t vsn; // allocated游标的版本(vsn) TODO:修正注释
+ uint64_t off; // entry在当前block的偏移(offset)
+ uint32_t actual_burst; // 实际出/入队个数
+ struct bbq_block *block; // 指向所在的block
};
struct bbq_queue_state_s {
- enum bbq_queue_state_e state; // 队列状态
- union {
- uint64_t vsn; // reserve_entry state==BLOCK_DONE时生效
- struct bbq_entry_desc_s e; // state为ALLOCATED、RESERVED生效
+ enum bbq_queue_state state; // 队列状态
+ union { // TODO:
+ uint64_t vsn; // reserve_entry state==BLOCK_DONE时生效
+ struct bbq_entry_desc e; // state为ALLOCATED、RESERVED生效
};
};
-extern inline uint64_t bbq_idx(struct bbq_s *q, uint64_t x) {
+extern inline uint64_t bbq_idx(struct bbq *q, uint64_t x) {
return x & q->idx_mask;
}
-extern inline uint64_t bbq_off(struct bbq_s *q, uint64_t x) {
+extern inline uint64_t bbq_off(struct bbq *q, uint64_t x) {
return x & q->off_mask;
}
-extern inline uint64_t bbq_head_vsn(struct bbq_s *q, uint64_t x) {
+extern inline uint64_t bbq_head_vsn(struct bbq *q, uint64_t x) {
return x >> q->idx_bits;
}
-extern inline uint64_t bbq_cur_vsn(struct bbq_s *q, uint64_t x) {
+extern inline uint64_t bbq_cur_vsn(struct bbq *q, uint64_t x) {
return x >> q->off_bits;
}
-extern inline uint64_t set_cur_vsn(struct bbq_s *q, uint64_t ver) {
+extern inline uint64_t set_cur_vsn(struct bbq *q, uint64_t ver) {
return ver << q->off_bits;
}
-#ifdef BBQ_MEMORY
-#define BBQ_MEM_MAGIC 0xFF
-#endif
+// 当BBQ_MEMORY宏定义开关打开,将对内存分配释放进行统计,方便排查内存泄漏
+// #define BBQ_MEMORY
+enum bbq_module {
+ BBQ_MODULE_QUEUE = 0,
+ BBQ_MODULE_QUEUE_BLOCK_NB,
+ BBQ_MODULE_QUEUE_BLOCK_ENTRY,
+ BBQ_MODULE_MAX,
+};
#ifdef BBQ_MEMORY
+#define BBQ_MEM_MAGIC 0xFF
struct bbq_memory_s {
aotmic_uint64 malloc_cnt;
aotmic_uint64 malloc_size;
@@ -100,7 +96,7 @@ struct bbq_memory_s {
struct bbq_memory_s bbq_memory_g[BBQ_MODULE_MAX] = {0};
#endif
-void *bbq_malloc(enum bbq_module_e module, int socket_id, size_t size) {
+static void *bbq_malloc(enum bbq_module module, int socket_id, size_t size) {
void *ptr = NULL;
if (socket_id >= 0) {
ptr = numa_alloc_onnode(size, 0);
@@ -118,7 +114,7 @@ void *bbq_malloc(enum bbq_module_e module, int socket_id, size_t size) {
return ptr;
}
-void bbq_free(enum bbq_module_e module, int socket_id, void *ptr, size_t size) {
+static void bbq_free(enum bbq_module module, int socket_id, void *ptr, size_t size) {
if (socket_id >= 0) {
numa_free(ptr, size);
} else {
@@ -137,9 +133,9 @@ void bbq_free(enum bbq_module_e module, int socket_id, void *ptr, size_t size) {
/* 原子的比较两个值大小,并设置较大的值,成功则返回设置前的旧值 */
uint64_t fetch_max(aotmic_uint64 *atom, uint64_t upd) {
- uint64_t old_value;
+ uint64_t old_value = 0;
do {
- old_value = atomic_load(atom); // 读取当前值
+ old_value = atomic_load(atom);
} while (old_value < upd && !atomic_compare_exchange_weak(atom, &old_value, upd));
return old_value;
@@ -165,7 +161,7 @@ uint32_t bbq_blocks_calc(uint32_t entries) {
}
/* 块初始化 */
-int block_init(struct bbq_s *q, struct bbq_block_s *block, bool cursor_init) {
+int block_init(struct bbq *q, struct bbq_block *block, bool cursor_init) {
#ifdef BBQ_MEMORY
// 末尾多分配一个entry(永远不应该被修改),以此检查是否存在写越界的问题
block->entries = bbq_malloc(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id,
@@ -192,10 +188,10 @@ int block_init(struct bbq_s *q, struct bbq_block_s *block, bool cursor_init) {
block->committed = ATOMIC_VAR_INIT(q->bs);
block->allocated = ATOMIC_VAR_INIT(q->bs);
block->reserved = ATOMIC_VAR_INIT(q->bs);
- if (BBQ_POLIC_RETRY_NEW(q->flags)) {
- block->consumed = ATOMIC_VAR_INIT(q->bs);
- } else {
+ if (BBQ_F_CHK_DROP_OLD(q->flags)) {
block->consumed = ATOMIC_VAR_INIT(0);
+ } else {
+ block->consumed = ATOMIC_VAR_INIT(q->bs);
}
}
@@ -203,7 +199,7 @@ int block_init(struct bbq_s *q, struct bbq_block_s *block, bool cursor_init) {
}
/* 块清理函数,与block_init成对*/
-void block_cleanup(struct bbq_s *q, struct bbq_block_s *block) {
+void block_destory(struct bbq *q, struct bbq_block *block) {
if (block->entries) {
#ifdef BBQ_MEMORY
bbq_free(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id,
@@ -235,8 +231,8 @@ unsigned ceil_log2(uint64_t x) {
return x == 1 ? 0 : floor_log2(x - 1) + 1;
}
-/* 创建消息队列,bn和bs必须是2的N次幂,socket_id用于多numa分配内存,free_func先设置NULL */
-struct bbq_s *bbq_ring_create_bnbs_with_socket(uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags) {
+/* 创建消息队列,bn和bs必须是2的N次幂,socket_id用于多numa分配内存 */
+static struct bbq *__bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags) {
int ret = 0;
if (bbq_check_power_of_two(bn) == false) {
@@ -254,29 +250,29 @@ struct bbq_s *bbq_ring_create_bnbs_with_socket(uint32_t bn, uint32_t bs, size_t
return NULL;
}
+ if (name == NULL || strlen(name) >= BBQ_SYMBOL_MAX - 1) {
+ BBQ_ERR_LOG("invalid bbq name, max len is %d", BBQ_SYMBOL_MAX - 1);
+ return NULL;
+ }
+
if (numa_available() < 0) {
// 不支持numa,设置
- socket_id = BBQ_INVALID_SOCKET;
+ socket_id = BBQ_SOCKET_ID_ANY;
}
- struct bbq_s *q = bbq_malloc(BBQ_MODULE_QUEUE, socket_id, sizeof(*q));
+ struct bbq *q = bbq_malloc(BBQ_MODULE_QUEUE, socket_id, sizeof(*q));
if (q == NULL) {
BBQ_ERR_LOG("malloc for bbq queue error");
return NULL;
}
memset(q, 0, sizeof(*q));
-
+ ret = snprintf(q->name, sizeof(q->name), "%s", name);
q->bn = bn;
q->bs = bs;
- q->obj_size = obj_size;
- if (BBQ_COPY_POINTER(flags)) {
- q->entry_size = sizeof(void *);
- } else {
- q->entry_size = obj_size;
- }
+ q->entry_size = obj_size;
q->socket_id = socket_id;
- q->phead = 0;
- q->chead = 0;
+ q->phead = ATOMIC_VAR_INIT(0);
+ q->chead = ATOMIC_VAR_INIT(0);
q->flags = flags;
q->blocks = bbq_malloc(BBQ_MODULE_QUEUE_BLOCK_NB, socket_id, bn * sizeof(*q->blocks));
@@ -287,9 +283,9 @@ struct bbq_s *bbq_ring_create_bnbs_with_socket(uint32_t bn, uint32_t bs, size_t
memset(q->blocks, 0, sizeof(*q->blocks));
for (uint32_t i = 0; i < bn; ++i) {
- // 第一个block不需要设置cursor_init_flag
- bool cursor_init_flag = (i == 0 ? false : true);
- ret = block_init(q, &(q->blocks[i]), cursor_init_flag);
+ // 第一个block不需要设置cursor_init
+ bool cursor_init = (i == 0 ? false : true);
+ ret = block_init(q, &(q->blocks[i]), cursor_init);
if (ret != BBQ_OK) {
BBQ_ERR_LOG("bbq block init error");
goto error;
@@ -297,7 +293,7 @@ struct bbq_s *bbq_ring_create_bnbs_with_socket(uint32_t bn, uint32_t bs, size_t
}
q->idx_bits = ceil_log2(bn);
- q->off_bits = ceil_log2(bs) + 1;
+ q->off_bits = ceil_log2(bs) + 1; // 多线程同时add,可能超过bs的问题,因此多分配一位
q->idx_mask = (1 << q->idx_bits) - 1;
q->off_mask = (1 << q->off_bits) - 1;
@@ -305,17 +301,20 @@ struct bbq_s *bbq_ring_create_bnbs_with_socket(uint32_t bn, uint32_t bs, size_t
return q;
error:
- bbq_ring_free(q);
+ bbq_destory(q);
return NULL;
}
-/* 创建消息队列,bn和bs必须是2的N次幂,free_func先设置NULL */
-struct bbq_s *bbq_ring_create_bnbs(uint32_t bn, uint32_t bs, size_t obj_size, unsigned int flags) {
- return bbq_ring_create_bnbs_with_socket(bn, bs, obj_size, BBQ_INVALID_SOCKET, flags);
+struct bbq *bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs, int socket_id, uint32_t flags) {
+ return __bbq_create_bnbs(name, bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR);
+}
+
+struct bbq *bbq_create_bnbs_elem(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags) {
+ return __bbq_create_bnbs(name, bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE);
}
/* 创建消息队列,count必须大于1,且是2的N次幂,bn和bs将根据count值自动计算,socket_id用于多numa分配内存,free_func先设置NULL */
-struct bbq_s *bbq_create_with_socket(uint32_t count, size_t obj_size, int socket_id, unsigned int flags) {
+struct bbq *bbq_create_elem(const char *name, uint32_t count, size_t obj_size, int socket_id, uint32_t flags) {
if (bbq_check_power_of_two(count) == false || count == 1) {
BBQ_ERR_LOG("bbq entries number must be power of two and greater than 1, now is :%u", count);
return NULL;
@@ -323,74 +322,72 @@ struct bbq_s *bbq_create_with_socket(uint32_t count, size_t obj_size, int socket
uint32_t bn = bbq_blocks_calc(count);
uint32_t bs = count / bn;
- return bbq_ring_create_bnbs_with_socket(bn, bs, obj_size, socket_id, flags);
+
+ return bbq_create_bnbs_elem(name, bn, bs, obj_size, socket_id, flags);
}
-/* 创建消息队列,count必须大于1,且是2的N次幂,bn和bs将根据count值自动计算,free_func先设置NULL */
-struct bbq_s *bbq_create(uint32_t count, size_t obj_size, unsigned int flags) {
- // 传入无效socket_id,将使用malloc分配内存
- return bbq_create_with_socket(count, obj_size, BBQ_INVALID_SOCKET, flags);
+struct bbq *bbq_create(const char *name, uint32_t count, int socket_id, uint32_t flags) {
+ if (bbq_check_power_of_two(count) == false || count == 1) {
+ BBQ_ERR_LOG("bbq entries number must be power of two and greater than 1, now is :%u", count);
+ return NULL;
+ }
+
+ uint32_t bn = bbq_blocks_calc(count);
+ uint32_t bs = count / bn;
+
+ return bbq_create_bnbs(name, bn, bs, socket_id, flags);
}
/* 释放消息队列,与bbq_ring_create系列接口成对*/
-void bbq_ring_free(struct bbq_s *q) {
+void bbq_destory(struct bbq *q) {
if (q == NULL) {
return;
}
for (uint32_t i = 0; i < q->bn; ++i) {
- block_cleanup(q, &(q->blocks[i]));
+ block_destory(q, &(q->blocks[i]));
}
bbq_free(BBQ_MODULE_QUEUE_BLOCK_NB, q->socket_id, q->blocks, q->bn * sizeof(*q->blocks));
bbq_free(BBQ_MODULE_QUEUE, q->socket_id, q, sizeof(*q));
}
-// flags 第一位控制传入的是一维数组还是二维数组
-#define BBQ_F_SINGLE 0x0
-#define BBQ_F_ARRAY_1D 0x1
-#define BBQ_F_ARRAY_2D 0x2
-void commit_entry(struct bbq_s *q, struct bbq_entry_desc_s *e, void const *data, uint32_t flag) {
+#define BBQ_DATA_TYPE_SINGLE 0x0
+#define BBQ_DATA_TYPE_ARRAY_1D 0x1
+#define BBQ_DATA_TYPE_ARRAY_2D 0x2
+void commit_entry(struct bbq *q, struct bbq_entry_desc *e, void const *data, uint32_t data_type) {
size_t idx = e->off * q->entry_size;
- if (BBQ_COPY_POINTER(q->flags)) {
- // 指针入队列
- switch (flag) {
- case BBQ_F_ARRAY_1D: {
- char *tmp = (char *)data;
+ if (BBQ_F_CHK_VALUE(q->flags)) {
+ // 数据入队列
+ switch (data_type) {
+ case BBQ_DATA_TYPE_ARRAY_1D:
+ case BBQ_DATA_TYPE_SINGLE:
+ memcpy(&(e->block->entries[idx]), data, q->entry_size * e->actual_burst);
+ break;
+ case BBQ_DATA_TYPE_ARRAY_2D: {
+ void **tmp = (void **)data;
char *entry = &(e->block->entries[idx]);
for (size_t i = 0; i < e->actual_burst; i++) {
- memcpy(entry, &tmp, q->entry_size);
+ memcpy(entry, *tmp, q->entry_size);
entry += q->entry_size;
- tmp += q->obj_size;
+ tmp++;
}
break;
}
- case BBQ_F_ARRAY_2D:
- case BBQ_F_SINGLE:
- // 二维数组名等于首成员的地址,这里data其实是void **data, &data等同于 &(data[0])
- memcpy(&(e->block->entries[idx]), data, q->entry_size * e->actual_burst);
- break;
default:
break;
}
} else {
- // 数据入队列
- switch (flag) {
- case BBQ_F_ARRAY_1D:
- case BBQ_F_SINGLE:
+ // 指针入队列
+ switch (data_type) {
+ case BBQ_DATA_TYPE_ARRAY_2D:
+ case BBQ_DATA_TYPE_SINGLE:
+ // 二维数组名等于首成员的地址,这里data其实是void **data, &data等同于 &(data[0])
memcpy(&(e->block->entries[idx]), data, q->entry_size * e->actual_burst);
break;
- case BBQ_F_ARRAY_2D: {
- void **tmp = (void **)data;
- char *entry = &(e->block->entries[idx]);
- for (size_t i = 0; i < e->actual_burst; i++) {
- memcpy(entry, *tmp, q->entry_size);
- entry += q->entry_size;
- tmp++;
- }
+ case BBQ_DATA_TYPE_ARRAY_1D:
break;
- }
default:
break;
}
@@ -398,7 +395,7 @@ void commit_entry(struct bbq_s *q, struct bbq_entry_desc_s *e, void const *data,
atomic_fetch_add(&e->block->committed, e->actual_burst);
}
-struct bbq_queue_state_s allocate_entry(struct bbq_s *q, struct bbq_block_s *block, uint32_t n) {
+struct bbq_queue_state_s allocate_entry(struct bbq *q, struct bbq_block *block, uint32_t n) {
struct bbq_queue_state_s state = {0};
if (bbq_off(q, atomic_load(&block->allocated)) >= q->bs) {
state.state = BBQ_BLOCK_DONE;
@@ -431,13 +428,19 @@ struct bbq_queue_state_s allocate_entry(struct bbq_s *q, struct bbq_block_s *blo
return state;
}
-enum bbq_queue_state_e advance_phead(struct bbq_s *q, uint64_t ph) {
+enum bbq_queue_state advance_phead(struct bbq *q, uint64_t ph) {
// 获取下一个block
uint64_t cur = 0;
- struct bbq_block_s *n_blk = &(q->blocks[(bbq_idx(q, ph) + 1) & q->idx_mask]);
+ struct bbq_block *n_blk = &(q->blocks[(bbq_idx(q, ph) + 1) & q->idx_mask]);
uint64_t ph_vsn = bbq_head_vsn(q, ph);
- if (BBQ_POLIC_RETRY_NEW(q->flags)) {
+ if (BBQ_F_CHK_DROP_OLD(q->flags)) {
+ cur = atomic_load(&n_blk->committed);
+ // 生产者避免前进到上一轮中尚未完全提交的区块
+ if (bbq_cur_vsn(q, cur) == ph_vsn && bbq_off(q, cur) != q->bs) {
+ return BBQ_NOT_AVAILABLE;
+ }
+ } else {
cur = atomic_load(&n_blk->consumed);
uint64_t reserved;
uint64_t consumed_off = bbq_off(q, cur);
@@ -452,12 +455,6 @@ enum bbq_queue_state_e advance_phead(struct bbq_s *q, uint64_t ph) {
return BBQ_NOT_AVAILABLE;
}
}
- } else {
- cur = atomic_load(&n_blk->committed);
- // 生产者避免前进到上一轮中尚未完全提交的区块
- if (bbq_cur_vsn(q, cur) == ph_vsn && bbq_off(q, cur) != q->bs) {
- return BBQ_NOT_AVAILABLE;
- }
}
// 用head的version初始化下一个块,version在高位,version+1,idex/offset清零,如果没有被其他线程执行过,数值会高于旧值。多线程同时只更新一次。
@@ -470,8 +467,53 @@ enum bbq_queue_state_e advance_phead(struct bbq_s *q, uint64_t ph) {
return BBQ_SUCCESS;
}
+static uint32_t bbq_wait_consumed_set(struct bbq *q, uint64_t *ch_ptr, uint64_t *ph_ptr, struct bbq_block *blk_ph) {
+ uint64_t ch;
+ uint64_t ph;
+ if (ch_ptr != NULL) {
+ ch = *ch_ptr;
+ } else {
+ ch = atomic_load(&q->chead);
+ }
+
+ if (ph_ptr != NULL) {
+ ph = *ph_ptr;
+ } else {
+ ph = atomic_load(&q->phead);
+ }
+
+ uint64_t ph_idx = bbq_idx(q, ph);
+ uint64_t ch_idx = bbq_idx(q, ch);
+ uint64_t committed_off = bbq_off(q, atomic_load(&blk_ph->committed));
+
+ struct bbq_block *blk_ch = &(q->blocks[bbq_idx(q, ch)]);
+ uint64_t reserved_off = bbq_off(q, atomic_load(&blk_ch->reserved));
+
+ // "生产者"超过"消费者"的索引(即块)的个数
+ uint64_t idx_diff = ph_idx >= ch_idx ? ph_idx - ch_idx : q->bn - ch_idx + ph_idx;
+ if (!BBQ_F_CHK_DROP_OLD(q->flags)) {
+ // 这里idx_diff-1=-1也是正确。
+ return (idx_diff - 1) * q->bs + (q->bs - reserved_off + committed_off);
+ }
+
+ uint64_t ch_vsn = bbq_head_vsn(q, ch);
+ uint64_t ph_vsn = bbq_head_vsn(q, ph);
+
+ if (ph_vsn == ch_vsn || (ph_vsn == (ch_vsn + 1) && (ph_idx < ch_idx))) {
+ return (idx_diff - 1) * q->bs + (q->bs - reserved_off + committed_off);
+ }
+
+ // 发生了覆盖
+ if (ph_idx == ch_idx) {
+ // 当前块以及之前已生产的都作废
+ return 0;
+ }
+
+ return (idx_diff - 1) * q->bs + committed_off;
+}
+
/* 消息队列入队 */
-static struct bbq_status __bbq_enqueue(struct bbq_s *q, void const *data, uint32_t n, uint32_t flag) {
+static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t n, uint32_t flag, uint32_t *wait_consumed) {
struct bbq_status ret = {.status = 0, .actual_burst = 0};
if (q == NULL || data == NULL) {
@@ -482,7 +524,7 @@ static struct bbq_status __bbq_enqueue(struct bbq_s *q, void const *data, uint32
while (true) {
// 获取当前phead,转为索引后获取到当前的blk
uint64_t ph = atomic_load(&q->phead);
- struct bbq_block_s *blk = &(q->blocks[bbq_idx(q, ph)]);
+ struct bbq_block *blk = &(q->blocks[bbq_idx(q, ph)]);
struct bbq_queue_state_s state = allocate_entry(q, blk, n);
switch (state.state) {
@@ -490,38 +532,49 @@ static struct bbq_status __bbq_enqueue(struct bbq_s *q, void const *data, uint32
commit_entry(q, &state.e, data, flag);
ret.actual_burst = state.e.actual_burst;
ret.status = BBQ_OK;
- return ret;
+ break;
case BBQ_BLOCK_DONE: {
- enum bbq_queue_state_e pstate = advance_phead(q, ph);
- switch (pstate) {
- case BBQ_NO_ENTRY:
+ enum bbq_queue_state pstate = advance_phead(q, ph);
+ if (pstate == BBQ_SUCCESS) {
+ continue;
+ }
+
+ if (pstate == BBQ_NO_ENTRY) {
ret.status = BBQ_QUEUE_FULL;
- return ret;
- case BBQ_NOT_AVAILABLE:
+ } else if (pstate == BBQ_NOT_AVAILABLE) {
ret.status = BBQ_QUEUE_BUSY;
- return ret;
- case BBQ_SUCCESS:
- continue;
- default:
+ } else {
ret.status = BBQ_ERROR;
- return ret;
}
+
break;
}
default:
ret.status = BBQ_ERROR;
- return ret;
+ break;
}
+
+ if (wait_consumed != NULL) {
+ *wait_consumed = bbq_wait_consumed_set(q, NULL, &ph, blk);
+ }
+
+ return ret;
}
}
-int bbq_enqueue(struct bbq_s *q, void *data) {
- struct bbq_status ret = __bbq_enqueue(q, data, 1, BBQ_F_SINGLE);
+int bbq_enqueue(struct bbq *q, void *const *data) {
+ struct bbq_status ret = __bbq_enqueue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL);
+ return ret.status;
+}
+
+int bbq_enqueue_elem(struct bbq *q, void const *data) {
+ struct bbq_status ret = __bbq_enqueue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL);
return ret.status;
}
/* 更新成功 reserve成功的个数 */
uint32_t reserve_update(bbq_cursor *aotmic, uint64_t reserved, uint32_t n) {
+ // TODO:逻辑可以合并
if (n == 1) {
// fetch_max返回的是旧值,如果旧值等于局部变量reserved,则代表数据由当前线程更新
if (fetch_max(aotmic, reserved + 1) == reserved) {
@@ -535,7 +588,7 @@ uint32_t reserve_update(bbq_cursor *aotmic, uint64_t reserved, uint32_t n) {
}
}
-struct bbq_queue_state_s reserve_entry(struct bbq_s *q, struct bbq_block_s *block, uint32_t n) {
+struct bbq_queue_state_s reserve_entry(struct bbq *q, struct bbq_block *block, uint32_t n) {
while (true) {
struct bbq_queue_state_s state;
uint64_t reserved = atomic_load(&block->reserved);
@@ -544,7 +597,9 @@ struct bbq_queue_state_s reserve_entry(struct bbq_s *q, struct bbq_block_s *bloc
if (reserved_off < q->bs) {
uint64_t consumed = atomic_load(&block->consumed);
- if (BBQ_POLIC_RETRY_NEW(q->flags) && reserved_svn != bbq_cur_vsn(q, consumed)) {
+ // TODO:bug?? ver溢出了,在drop old模式下使用了vsn,应该传入consumed的vsn合理?
+ // TODO:这个情况可能出现?
+ if (!BBQ_F_CHK_DROP_OLD(q->flags) && reserved_svn != bbq_cur_vsn(q, consumed)) {
// consumed溢出了,这种情况只发生在BBQ_RETRY_NEW,因为BBQ_DROP_OLD模式,consumed没有用到
state.state = BBQ_BLOCK_DONE;
state.vsn = reserved_svn;
@@ -553,7 +608,7 @@ struct bbq_queue_state_s reserve_entry(struct bbq_s *q, struct bbq_block_s *bloc
uint64_t committed = atomic_load(&block->committed);
uint64_t committed_off = bbq_off(q, committed);
- if (committed_off == reserved_off) { // TODO:多entry关注
+ if (committed_off == reserved_off) {
state.state = BBQ_NO_ENTRY;
return state;
}
@@ -589,26 +644,16 @@ struct bbq_queue_state_s reserve_entry(struct bbq_s *q, struct bbq_block_s *bloc
}
}
-bool consume_entry(struct bbq_s *q, struct bbq_entry_desc_s *e, void *deq_data, uint32_t flag) {
+bool consume_entry(struct bbq *q, struct bbq_entry_desc *e, void *deq_data, uint32_t data_type) {
size_t idx = e->off * q->entry_size;
- if (BBQ_COPY_POINTER(q->flags)) {
- switch (flag) {
- case BBQ_F_ARRAY_2D:
- case BBQ_F_SINGLE:
- memcpy(deq_data, &(e->block->entries[idx]), q->entry_size * e->actual_burst);
- break;
- case BBQ_F_ARRAY_1D:
- default:
- break;
- }
- } else {
- switch (flag) {
- case BBQ_F_ARRAY_1D:
- case BBQ_F_SINGLE:
+ if (BBQ_F_CHK_VALUE(q->flags)) {
+ switch (data_type) {
+ case BBQ_DATA_TYPE_ARRAY_1D:
+ case BBQ_DATA_TYPE_SINGLE:
memcpy(deq_data, &(e->block->entries[idx]), q->entry_size * e->actual_burst);
break;
- case BBQ_F_ARRAY_2D: {
+ case BBQ_DATA_TYPE_ARRAY_2D: {
void **tmp = (void **)deq_data;
char *entry = &(e->block->entries[idx]);
for (size_t i = 0; i < e->actual_burst; i++) {
@@ -621,31 +666,47 @@ bool consume_entry(struct bbq_s *q, struct bbq_entry_desc_s *e, void *deq_data,
default:
break;
}
+ } else {
+ switch (data_type) {
+ case BBQ_DATA_TYPE_ARRAY_2D:
+ case BBQ_DATA_TYPE_SINGLE:
+ memcpy(deq_data, &(e->block->entries[idx]), q->entry_size * e->actual_burst);
+ break;
+ case BBQ_DATA_TYPE_ARRAY_1D:
+ default:
+ break;
+ }
}
uint64_t allocated;
- if (BBQ_POLIC_RETRY_NEW(q->flags)) {
- atomic_fetch_add(&e->block->consumed, e->actual_burst);
- } else {
+ if (BBQ_F_CHK_DROP_OLD(q->flags)) {
// TODO:优化,考虑allocated vsn溢出?考虑判断如果生产满了,直接移动head?
allocated = atomic_load(&e->block->allocated);
// 预留的entry所在的块,已经被新生产的数据赶上了
if (bbq_cur_vsn(q, allocated) != e->vsn) {
return false;
}
+ } else {
+ atomic_fetch_add(&e->block->consumed, e->actual_burst);
}
return true;
}
-bool advance_chead(struct bbq_s *q, uint64_t ch, uint64_t ver) {
+bool advance_chead(struct bbq *q, uint64_t ch, uint64_t ver) {
uint64_t ch_idx = bbq_idx(q, ch);
- struct bbq_block_s *n_blk = &(q->blocks[(ch_idx + 1) & q->idx_mask]);
+ struct bbq_block *n_blk = &(q->blocks[(ch_idx + 1) & q->idx_mask]);
uint64_t ch_vsn = bbq_head_vsn(q, ch);
uint64_t committed = atomic_load(&n_blk->committed);
uint64_t committed_vsn = bbq_cur_vsn(q, committed);
- if (BBQ_POLIC_RETRY_NEW(q->flags)) {
+ if (BBQ_F_CHK_DROP_OLD(q->flags)) {
+ // 通过检查下一个块的版本是否大于或等于当前块来保证 FIFO 顺序.
+ // 第一个块是一个特殊情况,因为与其他块相比,它的版本总是相差一个。因此,如果 ch_idx == 0,我们在比较中加 1
+ if (committed_vsn < ver + (ch_idx == 0))
+ return false;
+ fetch_max(&n_blk->reserved, set_cur_vsn(q, committed_vsn));
+ } else {
if (committed_vsn != ch_vsn + 1) {
// 消费者追上了生产者,下一块还未开始生产
return false;
@@ -653,12 +714,6 @@ bool advance_chead(struct bbq_s *q, uint64_t ch, uint64_t ver) {
uint64_t new_vsn = set_cur_vsn(q, ch_vsn + 1);
fetch_max(&n_blk->consumed, new_vsn);
fetch_max(&n_blk->reserved, new_vsn);
- } else {
- // 通过检查下一个块的版本是否大于或等于当前块来保证 FIFO 顺序.
- // 第一个块是一个特殊情况,因为与其他块相比,它的版本总是相差一个。因此,如果 ch_idx == 0,我们在比较中加 1
- if (committed_vsn < ver + (ch_idx == 0))
- return false;
- fetch_max(&n_blk->reserved, set_cur_vsn(q, committed_vsn));
}
fetch_max(&q->chead, ch + 1);
@@ -666,7 +721,7 @@ bool advance_chead(struct bbq_s *q, uint64_t ch, uint64_t ver) {
}
/* 消息队列出队 */
-static struct bbq_status __bbq_dequeue(struct bbq_s *q, void *deq_data, uint32_t n, uint32_t flag) {
+static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n, uint32_t data_type, uint32_t *wait_consumed) {
struct bbq_status ret = {.status = 0, .actual_burst = 0};
if (q == NULL || deq_data == NULL) {
ret.status = BBQ_NULL_PTR;
@@ -675,46 +730,54 @@ static struct bbq_status __bbq_dequeue(struct bbq_s *q, void *deq_data, uint32_t
while (true) {
uint64_t ch = atomic_load(&q->chead);
- struct bbq_block_s *blk = &(q->blocks[bbq_idx(q, ch)]);
-
+ struct bbq_block *blk = &(q->blocks[bbq_idx(q, ch)]);
struct bbq_queue_state_s state;
state = reserve_entry(q, blk, n);
switch (state.state) {
case BBQ_RESERVED:
- if (consume_entry(q, &state.e, deq_data, flag)) {
- ret.status = BBQ_OK;
- ret.actual_burst = state.e.actual_burst;
- return ret;
- } else {
+ if (!consume_entry(q, &state.e, deq_data, data_type)) {
continue;
}
+ ret.status = BBQ_OK;
+ ret.actual_burst = state.e.actual_burst;
+ break;
case BBQ_NO_ENTRY:
ret.status = BBQ_QUEUE_EMPTY;
- return ret;
+ break;
case BBQ_NOT_AVAILABLE:
ret.status = BBQ_QUEUE_BUSY;
- return ret;
+ break;
case BBQ_BLOCK_DONE:
if (advance_chead(q, ch, state.vsn)) {
continue;
- } else {
- ret.status = BBQ_QUEUE_EMPTY;
- return ret;
}
+ ret.status = BBQ_QUEUE_EMPTY;
+ break;
default:
ret.status = BBQ_ERROR;
- return ret;
+ break;
}
+
+ if (wait_consumed != NULL) {
+ *wait_consumed = bbq_wait_consumed_set(q, &ch, NULL, blk);
+ }
+
+ return ret;
}
}
-int bbq_dequeue(struct bbq_s *q, void *deq_data) {
- struct bbq_status ret = __bbq_dequeue(q, deq_data, 1, BBQ_F_SINGLE);
+int bbq_dequeue(struct bbq *q, void **data) {
+ struct bbq_status ret = __bbq_dequeue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL);
+ return ret.status;
+}
+
+int bbq_dequeue_elem(struct bbq *q, void *data) {
+ struct bbq_status ret = __bbq_dequeue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL);
return ret.status;
}
-uint32_t bbq_max_burst(struct bbq_s *q, uint32_t n) {
+uint32_t bbq_max_burst(struct bbq *q, uint32_t n) {
uint32_t burst = n;
if (burst > q->bs) {
burst = q->bs;
@@ -723,11 +786,15 @@ uint32_t bbq_max_burst(struct bbq_s *q, uint32_t n) {
return burst;
}
-static uint32_t bbq_dequeue_burst_one_dimensional(struct bbq_s *q, void *obj_table, uint32_t n) {
+static uint32_t bbq_dequeue_burst_one_dimensional(struct bbq *q, void *obj_table, uint32_t n, uint32_t *wait_consumed) {
if (q == NULL || obj_table == NULL) {
return BBQ_NULL_PTR;
}
+ if (!BBQ_F_CHK_VALUE(q->flags)) {
+ return BBQ_QUEUE_DATA_ERR;
+ }
+
uint32_t burst = 0;
uint32_t ready = 0;
void *obj = obj_table;
@@ -735,18 +802,18 @@ static uint32_t bbq_dequeue_burst_one_dimensional(struct bbq_s *q, void *obj_tab
while (ready < n) {
burst = bbq_max_burst(q, n - ready);
- ret = __bbq_dequeue(q, obj, burst, BBQ_F_ARRAY_1D);
+ ret = __bbq_dequeue(q, obj, burst, BBQ_DATA_TYPE_ARRAY_1D, wait_consumed);
if (ret.status != BBQ_OK) {
break;
}
- obj += q->obj_size * ret.actual_burst;
+ obj += q->entry_size * ret.actual_burst;
ready += ret.actual_burst;
}
return ready;
}
-static uint32_t bbq_dequeue_burst_two_dimensional(struct bbq_s *q, void **obj_table, uint32_t n) {
+static uint32_t bbq_dequeue_burst_two_dimensional(struct bbq *q, void **obj_table, uint32_t n, uint32_t *wait_consumed) {
if (q == NULL || obj_table == NULL) {
return BBQ_NULL_PTR;
}
@@ -758,7 +825,7 @@ static uint32_t bbq_dequeue_burst_two_dimensional(struct bbq_s *q, void **obj_ta
while (ready < n) {
burst = bbq_max_burst(q, n - ready);
- ret = __bbq_dequeue(q, obj_table_tmp, burst, BBQ_F_ARRAY_2D);
+ ret = __bbq_dequeue(q, obj_table_tmp, burst, BBQ_DATA_TYPE_ARRAY_2D, wait_consumed);
if (ret.status != BBQ_OK) {
break;
}
@@ -770,11 +837,15 @@ static uint32_t bbq_dequeue_burst_two_dimensional(struct bbq_s *q, void **obj_ta
}
/* 尝试一次入队多个数据,直到达到最大数量,或是入队失败 */
-uint32_t bbq_enqueue_burst_one_dimensional(struct bbq_s *q, void const *obj_table, uint32_t n) {
+uint32_t bbq_enqueue_burst_one_dimensional(struct bbq *q, void const *obj_table, uint32_t n, uint32_t *wait_consumed) {
if (q == NULL || obj_table == NULL) {
return BBQ_NULL_PTR;
}
+ if (!BBQ_F_CHK_VALUE(q->flags)) {
+ return BBQ_QUEUE_DATA_ERR;
+ }
+
uint32_t burst = 0;
uint32_t ready = 0;
void const *obj = obj_table;
@@ -782,11 +853,11 @@ uint32_t bbq_enqueue_burst_one_dimensional(struct bbq_s *q, void const *obj_tabl
while (ready < n) {
burst = bbq_max_burst(q, n - ready);
- ret = __bbq_enqueue(q, obj, burst, BBQ_F_ARRAY_1D);
+ ret = __bbq_enqueue(q, obj, burst, BBQ_DATA_TYPE_ARRAY_1D, wait_consumed);
if (ret.status != BBQ_OK) {
break;
}
- obj += q->obj_size * ret.actual_burst;
+ obj += q->entry_size * ret.actual_burst;
ready += ret.actual_burst;
}
@@ -794,7 +865,7 @@ uint32_t bbq_enqueue_burst_one_dimensional(struct bbq_s *q, void const *obj_tabl
}
/* 尝试一次入队多个数据,直到达到最大数量,或是入队失败 */
-uint32_t bbq_enqueue_burst_two_dimensional(struct bbq_s *q, void *const *obj_table, uint32_t n) {
+uint32_t bbq_enqueue_burst_two_dimensional(struct bbq *q, void *const *obj_table, uint32_t n, uint32_t *wait_consumed) {
if (q == NULL || obj_table == NULL) {
return BBQ_NULL_PTR;
}
@@ -806,7 +877,7 @@ uint32_t bbq_enqueue_burst_two_dimensional(struct bbq_s *q, void *const *obj_tab
while (ready < n) {
burst = bbq_max_burst(q, n - ready);
- ret = __bbq_enqueue(q, obj_table_tmp, burst, BBQ_F_ARRAY_2D);
+ ret = __bbq_enqueue(q, obj_table_tmp, burst, BBQ_DATA_TYPE_ARRAY_2D, wait_consumed);
if (ret.status != BBQ_OK) {
break;
}
@@ -817,28 +888,60 @@ uint32_t bbq_enqueue_burst_two_dimensional(struct bbq_s *q, void *const *obj_tab
return ready;
}
-uint32_t bbq_enqueue_burst_value(struct bbq_s *q, void const *obj_table, uint32_t n) {
- return bbq_enqueue_burst_one_dimensional(q, obj_table, n);
+bool bbq_empty(struct bbq *q) {
+ uint64_t phead = atomic_load(&q->phead);
+ uint64_t chead = atomic_load(&q->chead);
+
+ uint64_t ph_vsn = bbq_head_vsn(q, phead);
+ uint64_t ch_vsn = bbq_head_vsn(q, chead);
+ uint64_t ph_idx = bbq_idx(q, phead);
+ uint64_t ch_idx = bbq_idx(q, chead);
+
+ struct bbq_block *block;
+
+ if (ph_idx != ch_idx) {
+ return false;
+ }
+
+ block = &q->blocks[ph_idx];
+ if (ph_vsn == ch_vsn) {
+ if (bbq_off(q, atomic_load(&block->reserved)) == bbq_off(q, atomic_load(&block->committed))) {
+ return true;
+ }
+ }
+
+ bbq_cursor reserved = atomic_load(&block->reserved);
+ uint64_t reserved_off = bbq_off(q, reserved);
+
+ if (BBQ_F_CHK_DROP_OLD(q->flags) &&
+ ph_vsn > ch_vsn &&
+ reserved_off != q->bs) {
+ // 生产者追上了消费者,当前块以及未消费的全部
+ // 如果reserved指向当前块的最后一个entry,可以移动head消费下一块,否则返回空
+ return true;
+ }
+
+ return false;
}
-uint32_t bbq_enqueue_burst_value_two_dimensional(struct bbq_s *q, void *const *obj_table, uint32_t n) {
- return bbq_enqueue_burst_two_dimensional(q, obj_table, n);
+uint32_t bbq_enqueue_burst_elem(struct bbq *q, void const *obj_table, uint32_t n, uint32_t *wait_consumed) {
+ return bbq_enqueue_burst_one_dimensional(q, obj_table, n, wait_consumed);
}
-uint32_t bbq_enqueue_burst_ptr(struct bbq_s *q, void *const *obj_table, uint32_t n) {
- return bbq_enqueue_burst_two_dimensional(q, obj_table, n);
+uint32_t bbq_enqueue_burst_elem_two_dimensional(struct bbq *q, void *const *obj_table, uint32_t n, uint32_t *wait_consumed) {
+ return bbq_enqueue_burst_two_dimensional(q, obj_table, n, wait_consumed);
}
-uint32_t bbq_enqueue_burst_ptr_one_dimensional(struct bbq_s *q, void const *obj_table, uint32_t n) {
- return bbq_enqueue_burst_one_dimensional(q, obj_table, n);
+uint32_t bbq_enqueue_burst(struct bbq *q, void *const *obj_table, uint32_t n, uint32_t *wait_consumed) {
+ return bbq_enqueue_burst_two_dimensional(q, obj_table, n, wait_consumed);
}
-uint32_t bbq_dequeue_burst_ptr(struct bbq_s *q, void **obj_table, uint32_t n) {
- return bbq_dequeue_burst_two_dimensional(q, obj_table, n);
+uint32_t bbq_dequeue_burst(struct bbq *q, void **obj_table, uint32_t n, uint32_t *wait_consumed) {
+ return bbq_dequeue_burst_two_dimensional(q, obj_table, n, wait_consumed);
}
-uint32_t bbq_dequeue_burst_value(struct bbq_s *q, void *obj_table, uint32_t n) {
- return bbq_dequeue_burst_one_dimensional(q, obj_table, n);
+uint32_t bbq_dequeue_burst_elem(struct bbq *q, void *obj_table, uint32_t n, uint32_t *wait_consumed) {
+ return bbq_dequeue_burst_one_dimensional(q, obj_table, n, wait_consumed);
}
bool bbq_malloc_free_equal() {
@@ -865,7 +968,7 @@ bool bbq_malloc_free_equal() {
#endif
}
-bool bbq_check_array_bounds(struct bbq_s *q) {
+bool bbq_debug_check_array_bounds(struct bbq *q) {
#ifdef BBQ_MEMORY
void *value = malloc(q->entry_size);
memset(value, BBQ_MEM_MAGIC, q->entry_size);
@@ -883,7 +986,7 @@ bool bbq_check_array_bounds(struct bbq_s *q) {
return true;
}
-void bbq_memory_print() {
+void bbq_debug_memory_print() {
#ifdef BBQ_MEMORY
for (int i = 0; i < BBQ_MODULE_MAX; i++) {
uint64_t malloc_cnt = atomic_load(&bbq_memory_g[i].malloc_cnt);
@@ -905,25 +1008,36 @@ void bbq_memory_print() {
#endif
}
-#if 1
-// 调试用
-void bbq_block_print(struct bbq_s *q, struct bbq_block_s *block) {
+void bbq_debug_block_print(struct bbq *q, struct bbq_block *block) {
bbq_cursor allocated = atomic_load(&block->allocated);
bbq_cursor committed = atomic_load(&block->committed);
bbq_cursor reserved = atomic_load(&block->reserved);
bbq_cursor consumed = atomic_load(&block->consumed);
- printf(" allocated:%lu\n", bbq_off(q, allocated));
- printf(" committed:%lu\n", bbq_off(q, committed));
- printf(" reserved:%lu\n", bbq_off(q, reserved));
- printf(" consumed:%lu\n\n", bbq_off(q, consumed));
+ printf(" allocated:%lu committed:%lu reserved:%lu",
+ bbq_off(q, allocated), bbq_off(q, committed), bbq_off(q, reserved));
+ if (BBQ_F_CHK_DROP_OLD(q->flags)) {
+ printf("\n");
+ } else {
+ printf(" consumed:%lu\n", bbq_off(q, consumed));
+ }
}
-void bbq_struct_print(struct bbq_s *q) {
- printf("-------------\n");
- printf("ph idx:%lu vsn:%lu\n", bbq_idx(q, q->phead), bbq_head_vsn(q, q->phead));
- bbq_block_print(q, &(q->blocks[bbq_idx(q, q->phead)]));
+void bbq_debug_struct_print(struct bbq *q) {
+ printf("-----bbq:%s-----\n", BBQ_F_CHK_DROP_OLD(q->flags) ? "drop old" : "retry new");
+ uint64_t phead = atomic_load(&q->phead);
+ uint64_t chead = atomic_load(&q->chead);
- printf("ch idx:%lu vsn:%lu", bbq_idx(q, q->chead), bbq_head_vsn(q, q->chead));
- bbq_block_print(q, &(q->blocks[bbq_idx(q, q->chead)]));
-}
-#endif \ No newline at end of file
+ printf("block number:%lu block size:%lu total entries:%lu\n", q->bn, q->bs, q->bn * q->bs);
+ printf("producer header idx:%lu vsn:%lu\n", bbq_idx(q, phead), bbq_head_vsn(q, phead));
+
+ uint64_t ph_idx = bbq_idx(q, phead);
+ uint64_t ch_idx = bbq_idx(q, chead);
+ if (ph_idx != ch_idx) {
+ printf("block[%lu]\n", ph_idx);
+ bbq_debug_block_print(q, &(q->blocks[ph_idx]));
+ }
+
+ printf("consumer header idx:%lu vsn:%lu\n", bbq_idx(q, chead), bbq_head_vsn(q, chead));
+ printf("block[%lu]\n", ch_idx);
+ bbq_debug_block_print(q, &(q->blocks[ch_idx]));
+} \ No newline at end of file
diff --git a/bbq/test.c b/bbq/test.c
deleted file mode 100644
index 7fa8ecf..0000000
--- a/bbq/test.c
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * @Description: 描述信息
- * @Date: 2024-06-13 21:45:38
- * @LastEditTime: 2024-06-17 18:28:18
- */
-#include <stdint.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-// 假设我们有一些uint16_t的指针,并且我们想要复制它们的地址
-void copy_pointer_array(void **dest, uint16_t **src, size_t num_pointers) {
- // 使用memcpy复制指针数组的内容
- memcpy(dest, src, num_pointers * sizeof(uint16_t *));
-}
-
-// 一个简单的函数来展示如何使用这些指针
-void print_through_pointers(void *entries, size_t num_pointers) {
- uint16_t **pointers = (uint16_t **)entries;
- for (size_t i = 0; i < num_pointers; ++i) {
- if (pointers[i]) {
- printf("Pointer %zu points to %p, value: %u\n", i, (void *)pointers[i], *pointers[i]);
- } else {
- printf("Pointer %zu is NULL\n", i);
- }
- }
-}
-
-struct test_s {
- void *entries;
-};
-
-int main() {
- // 示例:创建一些uint16_t的指针和值
- uint16_t val1 = 123, val2 = 456, val3 = 789;
- uint16_t *ptrs[3] = {&val1, &val2, &val3};
-
- printf("%p\n", ptrs[0]);
- printf("%p\n", ptrs[1]);
- printf("%p\n", ptrs[2]);
-
- // 使用void指针进行复制
- struct test_s tt;
- tt.entries = malloc(3 * sizeof(uint16_t *));
- copy_pointer_array(tt.entries, (uint16_t **)ptrs, 3);
-
- // 通过void指针读取数据
- print_through_pointers(tt.entries, 3);
-
- // 清理内存
- free(tt.entries);
-
- return 0;
-} \ No newline at end of file
diff --git a/bbq/tests/common/test_queue.c b/bbq/tests/common/test_queue.c
index 9a96e33..f4726d9 100644
--- a/bbq/tests/common/test_queue.c
+++ b/bbq/tests/common/test_queue.c
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-18 04:11:12
+ * @LastEditTime: 2024-06-24 10:16:28
* @Describe: TODO
*/
@@ -10,25 +10,22 @@
#include "test_mix.h"
#include <sys/prctl.h>
#include <unistd.h>
-extern bool bbq_check_array_bounds(struct bbq_s *q);
-extern struct bbq_s *bbq_ring_create_bnbs(uint32_t bn, uint32_t bs, size_t obj_size, unsigned int flags);
-extern struct bbq_s *bbq_ring_create_bnbs_with_socket(uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, unsigned int flags);
+extern bool bbq_debug_check_array_bounds(struct bbq *q);
+extern struct bbq *bbq_create_bnbs_elem(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags);
+extern struct bbq *bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs, int socket_id, uint32_t flags);
-uint32_t test_bbq_enqueue_burst(void *ring, void **obj_table, uint32_t n, uint16_t thread_idx) {
+uint32_t test_bbq_enqueue_burst(void *ring, void **obj_table, uint32_t n, uint16_t thread_idx, uint32_t *wait_consumed) {
TEST_AVOID_WARNING(thread_idx);
- return bbq_enqueue_burst_ptr(ring, obj_table, n);
+ return bbq_enqueue_burst(ring, obj_table, n, wait_consumed);
}
int test_queue_init_bbq(test_cfg *cfg, test_queue_s *q) {
- size_t obj_size = sizeof(test_data);
-
if (cfg->ring.block_count == 0) {
- q->ring = bbq_create_with_socket(cfg->ring.entries_cnt, obj_size, 0, BBQ_CREATE_F_RETRY_NEW | BBQ_CREATE_F_COPY_PTR);
+ q->ring = bbq_create("test_bbq", cfg->ring.entries_cnt, BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW);
} else {
- q->ring = bbq_ring_create_bnbs_with_socket(cfg->ring.block_count,
- cfg->ring.entries_cnt / cfg->ring.block_count,
- obj_size,
- 0, BBQ_CREATE_F_RETRY_NEW | BBQ_CREATE_F_COPY_PTR);
+ q->ring = bbq_create_bnbs("test_bbq", cfg->ring.block_count,
+ cfg->ring.entries_cnt / cfg->ring.block_count,
+ BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW);
}
if (q->ring == NULL) {
@@ -36,11 +33,11 @@ int test_queue_init_bbq(test_cfg *cfg, test_queue_s *q) {
return BBQ_NULL_PTR;
}
- q->ring_free_f = (test_ring_free_f)bbq_ring_free;
+ q->ring_free_f = (test_ring_free_f)bbq_destory;
q->enqueue_f = (test_ring_enqueue_f)bbq_enqueue;
q->dequeue_f = (test_ring_dequeue_f)bbq_dequeue;
- q->dequeue_burst_f = (test_dequeue_burst_f)bbq_dequeue_burst_ptr;
q->enqueue_burst_f = (test_enqueue_burst_f)test_bbq_enqueue_burst;
+ q->dequeue_burst_f = (test_dequeue_burst_f)bbq_dequeue_burst;
return 0;
}
@@ -110,7 +107,7 @@ void test_data_destory(test_data **data, size_t cnt) {
uint32_t test_exec_enqueue(test_queue_s *q, test_data **data, size_t burst_cnt, test_time_metric *op_use_diff, uint16_t thread_idx) {
uint32_t enqueue_cnt = 0;
test_time_metric op_use_start = test_clock_time_get();
- enqueue_cnt = q->enqueue_burst_f(q->ring, (void **)data, burst_cnt, thread_idx);
+ enqueue_cnt = q->enqueue_burst_f(q->ring, (void **)data, burst_cnt, thread_idx, NULL);
*op_use_diff = test_clock_time_sub(test_clock_time_get(), op_use_start);
return enqueue_cnt;
@@ -120,7 +117,7 @@ uint32_t test_exec_dequeue(test_queue_s *q, test_data **data, size_t burst_cnt,
uint32_t dequeue_cnt = 0;
test_time_metric op_use_start = test_clock_time_get();
- dequeue_cnt = q->dequeue_burst_f(q->ring, (void **)data, burst_cnt);
+ dequeue_cnt = q->dequeue_burst_f(q->ring, (void **)data, burst_cnt, NULL);
*op_use_diff = test_clock_time_sub(test_clock_time_get(), op_use_start);
return dequeue_cnt;
diff --git a/bbq/tests/common/test_queue.h b/bbq/tests/common/test_queue.h
index 723f0b3..8d40e92 100644
--- a/bbq/tests/common/test_queue.h
+++ b/bbq/tests/common/test_queue.h
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-17 18:17:24
+ * @LastEditTime: 2024-06-21 18:05:44
* @Describe: TODO
*/
@@ -15,8 +15,8 @@
typedef void (*test_ring_free_f)(void *ring);
typedef int (*test_ring_enqueue_f)(void *ring, void *obj);
typedef int (*test_ring_dequeue_f)(void *ring, void *obj);
-typedef uint32_t (*test_enqueue_burst_f)(void *ring, void **obj_table, uint32_t n, uint16_t thread_idx);
-typedef uint32_t (*test_dequeue_burst_f)(void *ring, void **obj_table, uint32_t n);
+typedef uint32_t (*test_enqueue_burst_f)(void *ring, void **obj_table, uint32_t n, uint16_t thread_idx, uint32_t *wait_consumed);
+typedef uint32_t (*test_dequeue_burst_f)(void *ring, void **obj_table, uint32_t n, uint32_t *wait_consumed);
typedef bool (*test_ring_empty_f)(void *ring);
typedef struct {
@@ -91,10 +91,10 @@ extern void test_wait_all_threads_ready(test_ctl *ctl);
extern void test_queue_destory(test_queue_s *q);
extern int test_queue_init_bbq(test_cfg *cfg, test_queue_s *q);
extern void test_merge_all_data(test_exit_data **exit_data, uint32_t thread_cnt, test_merge_s *merge);
-extern uint64_t bbq_idx(struct bbq_s *q, uint64_t x);
-extern uint64_t bbq_off(struct bbq_s *q, uint64_t x);
-extern uint64_t bbq_head_vsn(struct bbq_s *q, uint64_t x);
-extern uint64_t bbq_cur_vsn(struct bbq_s *q, uint64_t x);
+extern uint64_t bbq_idx(struct bbq *q, uint64_t x);
+extern uint64_t bbq_off(struct bbq *q, uint64_t x);
+extern uint64_t bbq_head_vsn(struct bbq *q, uint64_t x);
+extern uint64_t bbq_cur_vsn(struct bbq *q, uint64_t x);
extern test_data **test_data_create(size_t cnt);
extern void test_data_destory(test_data **data, size_t cnt);
#endif \ No newline at end of file
diff --git a/bbq/tests/unittest/ut.h b/bbq/tests/unittest/ut.h
index b36c2ee..a6ef9ad 100644
--- a/bbq/tests/unittest/ut.h
+++ b/bbq/tests/unittest/ut.h
@@ -17,7 +17,7 @@ typedef struct {
} testdata_s;
typedef struct {
- struct bbq_s *q;
+ struct bbq *q;
uint32_t usleep; // 该线程每次执行间隔睡眠时间
bool until_end; // 循环读取,直到队列空或满
uint64_t thread_exec_times; // 每个线程生产/消费次数
diff --git a/bbq/tests/unittest/ut_example.cc b/bbq/tests/unittest/ut_example.cc
index fe78e03..64ee3f6 100644
--- a/bbq/tests/unittest/ut_example.cc
+++ b/bbq/tests/unittest/ut_example.cc
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-18 15:11:23
+ * @LastEditTime: 2024-06-24 10:13:21
* @Describe: 简单的测试用例,测试基本功能
*/
@@ -11,11 +11,10 @@ extern "C" {
#include "test_queue.h"
#include "ut.h"
extern bool bbq_malloc_free_equal();
-extern void bbq_memory_print();
-extern bool bbq_check_array_bounds(struct bbq_s *q);
-extern void bbq_struct_print(struct bbq_s *q);
-extern uint32_t bbq_enqueue_burst_ptr_one_dimensional(struct bbq_s *q, void const *obj_table, uint32_t n);
-extern uint32_t bbq_enqueue_burst_value_two_dimensional(struct bbq_s *q, void *const *obj_table, uint32_t n);
+extern void bbq_debug_memory_print();
+extern bool bbq_debug_check_array_bounds(struct bbq *q);
+extern void bbq_struct_print(struct bbq *q);
+extern uint32_t bbq_enqueue_burst_elem_two_dimensional(struct bbq *q, void *const *obj_table, uint32_t n, uint32_t *wait_consumed);
}
#define BUF_CNT 4096
@@ -54,22 +53,22 @@ TEST_F(bbq_example, single_retry_new_cp_ptr) {
uint16_t *deq_data = NULL;
// 创建队列
- struct bbq_s *q = bbq_create(BUF_CNT, sizeof(uint16_t), BBQ_CREATE_F_RETRY_NEW | BBQ_CREATE_F_COPY_PTR);
- EXPECT_TRUE(q);
+ struct bbq *q = bbq_create("test_bbq", BUF_CNT, BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW);
+ ASSERT_NE(q, nullptr);
// 空队出队失败
- EXPECT_EQ(bbq_dequeue(q, &deq_data), BBQ_QUEUE_EMPTY);
+ EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_QUEUE_EMPTY);
// 全部入队成功
for (uint32_t i = 0; i < 4000; i++) {
- if (bbq_enqueue(q, &enq_table1[i]) == 0) {
+ if (bbq_enqueue(q, (void **)&enq_table1[i]) == 0) {
cnt++;
}
}
// 部分入队成功
for (uint32_t i = 0; i < 4000; i++) {
- if (bbq_enqueue(q, &enq_table2[i]) == 0) {
+ if (bbq_enqueue(q, (void **)&enq_table2[i]) == 0) {
cnt++;
}
}
@@ -79,7 +78,7 @@ TEST_F(bbq_example, single_retry_new_cp_ptr) {
cnt = 0;
for (uint32_t i = 0; i < BUF_CNT; i++) {
- ret = bbq_dequeue(q, &deq_data);
+ ret = bbq_dequeue(q, (void **)&deq_data);
if (ret == 0) {
EXPECT_EQ(*deq_data, TEST_DATA_MAGIC);
cnt++;
@@ -89,7 +88,7 @@ TEST_F(bbq_example, single_retry_new_cp_ptr) {
EXPECT_EQ(cnt, BUF_CNT);
// 空队出队失败
- EXPECT_EQ(bbq_dequeue(q, &deq_data), BBQ_QUEUE_EMPTY);
+ EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_QUEUE_EMPTY);
}
TEST_F(bbq_example, single_retry_new_cp_value) {
@@ -98,22 +97,22 @@ TEST_F(bbq_example, single_retry_new_cp_value) {
uint16_t deq_data;
// 创建队列
- struct bbq_s *q = bbq_create(BUF_CNT, sizeof(uint16_t), BBQ_CREATE_F_RETRY_NEW | BBQ_CREATE_F_COPY_VALUE);
- EXPECT_TRUE(q);
+ struct bbq *q = bbq_create_elem("test_bbq", BUF_CNT, sizeof(uint16_t), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW);
+ ASSERT_NE(q, nullptr);
// 空队出队失败
- EXPECT_EQ(bbq_dequeue(q, &deq_data), BBQ_QUEUE_EMPTY);
+ EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_QUEUE_EMPTY);
// 全部入队成功
for (uint32_t i = 0; i < 4000; i++) {
- if (bbq_enqueue(q, enq_table1[i]) == 0) {
+ if (bbq_enqueue(q, (void **)enq_table1[i]) == 0) {
cnt++;
}
}
// 部分入队成功
for (uint32_t i = 0; i < 4000; i++) {
- if (bbq_enqueue(q, enq_table2[i]) == 0) {
+ if (bbq_enqueue_elem(q, enq_table2[i]) == 0) {
cnt++;
}
}
@@ -123,7 +122,7 @@ TEST_F(bbq_example, single_retry_new_cp_value) {
cnt = 0;
for (uint32_t i = 0; i < BUF_CNT; i++) {
- ret = bbq_dequeue(q, &deq_data);
+ ret = bbq_dequeue_elem(q, &deq_data);
if (ret == 0) {
EXPECT_EQ(deq_data, TEST_DATA_MAGIC);
cnt++;
@@ -133,7 +132,7 @@ TEST_F(bbq_example, single_retry_new_cp_value) {
EXPECT_EQ(cnt, BUF_CNT);
// 空队出队失败
- EXPECT_EQ(bbq_dequeue(q, &deq_data), BBQ_QUEUE_EMPTY);
+ EXPECT_EQ(bbq_dequeue_elem(q, &deq_data), BBQ_QUEUE_EMPTY);
}
TEST_F(bbq_example, single_drop_old_cp_pointer) {
@@ -144,18 +143,18 @@ TEST_F(bbq_example, single_drop_old_cp_pointer) {
uint64_t second_cnt = 1000;
// 创建队列
- struct bbq_s *q = bbq_create(BUF_CNT, sizeof(uint16_t), BBQ_CREATE_F_DROP_OLD | BBQ_CREATE_F_COPY_PTR);
- EXPECT_TRUE(q);
+ struct bbq *q = bbq_create("test_bbq", BUF_CNT, BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD);
+ ASSERT_NE(q, nullptr);
EXPECT_LT(second_cnt, q->bs * q->bn);
// 空队出队失败
- EXPECT_EQ(bbq_dequeue(q, &deq_data), BBQ_QUEUE_EMPTY);
+ EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_QUEUE_EMPTY);
// 全部入队成功,入队个数是BUF_CNT的整数倍,因此到了一个边界,刚好与消费者位置一致(套了loop圈)
uint32_t loop = 3;
for (uint32_t n = 0; n < loop; n++) {
for (uint32_t i = 0; i < first_cnt; i++) {
- ret = bbq_enqueue(q, &enq_table1[i]);
+ ret = bbq_enqueue(q, (void **)&enq_table1[i]);
if (ret == 0) {
cnt++;
}
@@ -166,7 +165,7 @@ TEST_F(bbq_example, single_drop_old_cp_pointer) {
// 全部入队成功
cnt = 0;
for (uint32_t i = 0; i < second_cnt; i++) {
- if (bbq_enqueue(q, &enq_table2[i]) == 0) {
+ if (bbq_enqueue(q, (void **)&enq_table2[i]) == 0) {
cnt++;
}
}
@@ -174,7 +173,7 @@ TEST_F(bbq_example, single_drop_old_cp_pointer) {
cnt = 0;
for (uint32_t i = 0; i < BUF_CNT; i++) {
- ret = bbq_dequeue(q, &deq_data);
+ ret = bbq_dequeue(q, (void **)&deq_data);
if (ret == 0) {
EXPECT_EQ(*deq_data, TEST_DATA_MAGIC);
cnt++;
@@ -185,7 +184,7 @@ TEST_F(bbq_example, single_drop_old_cp_pointer) {
EXPECT_EQ(cnt, second_cnt - q->bs);
// 空队出队失败
- EXPECT_EQ(bbq_dequeue(q, &deq_data), BBQ_QUEUE_EMPTY);
+ EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_QUEUE_EMPTY);
}
TEST_F(bbq_example, single_drop_old_cp_value) {
@@ -196,18 +195,18 @@ TEST_F(bbq_example, single_drop_old_cp_value) {
uint64_t second_cnt = 1000;
// 创建队列
- struct bbq_s *q = bbq_create(BUF_CNT, sizeof(uint16_t), BBQ_CREATE_F_DROP_OLD | BBQ_CREATE_F_COPY_VALUE);
- EXPECT_TRUE(q);
+ struct bbq *q = bbq_create_elem("test_bbq", BUF_CNT, sizeof(uint16_t), BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD);
+ ASSERT_NE(q, nullptr);
EXPECT_LT(second_cnt, q->bs * q->bn);
// 空队出队失败
- EXPECT_EQ(bbq_dequeue(q, &deq_data), BBQ_QUEUE_EMPTY);
+ EXPECT_EQ(bbq_dequeue_elem(q, &deq_data), BBQ_QUEUE_EMPTY);
// 全部入队成功,入队个数是BUF_CNT的整数倍,因此到了一个边界,刚好与消费者位置一致(套了loop圈)
uint32_t loop = 3;
for (uint32_t n = 0; n < loop; n++) {
for (uint32_t i = 0; i < first_cnt; i++) {
- ret = bbq_enqueue(q, enq_table1[i]);
+ ret = bbq_enqueue_elem(q, enq_table1[i]);
if (ret == 0) {
cnt++;
}
@@ -218,7 +217,7 @@ TEST_F(bbq_example, single_drop_old_cp_value) {
// 全部入队成功
cnt = 0;
for (uint32_t i = 0; i < second_cnt; i++) {
- if (bbq_enqueue(q, enq_table2[i]) == 0) {
+ if (bbq_enqueue_elem(q, enq_table2[i]) == 0) {
cnt++;
}
}
@@ -226,7 +225,7 @@ TEST_F(bbq_example, single_drop_old_cp_value) {
cnt = 0;
for (uint32_t i = 0; i < BUF_CNT; i++) {
- ret = bbq_dequeue(q, &deq_data);
+ ret = bbq_dequeue_elem(q, &deq_data);
if (ret == 0) {
EXPECT_EQ(deq_data, TEST_DATA_MAGIC);
cnt++;
@@ -237,41 +236,44 @@ TEST_F(bbq_example, single_drop_old_cp_value) {
EXPECT_EQ(cnt, second_cnt - q->bs);
// 空队出队失败
- EXPECT_EQ(bbq_dequeue(q, &deq_data), BBQ_QUEUE_EMPTY);
+ EXPECT_EQ(bbq_dequeue_elem(q, &deq_data), BBQ_QUEUE_EMPTY);
}
TEST_F(bbq_example, burst_retry_new_cp_value) {
- struct bbq_s *q;
+ struct bbq *q;
uint32_t ret1 = 0;
uint32_t ret2 = 0;
uint64_t first_cnt = 4000;
uint64_t second_cnt = 1000;
-
uint16_t deq_table1[BUF_CNT] = {0};
uint16_t *deq_table2 = (uint16_t *)test_malloc(TEST_MODULE_DATA, sizeof(uint16_t) * BUF_CNT);
+ uint32_t wait_consumed = 0;
// 创建队列
- q = bbq_create(BUF_CNT, sizeof(uint16_t), BBQ_CREATE_F_RETRY_NEW | BBQ_CREATE_F_COPY_VALUE);
- EXPECT_TRUE(q);
+ q = bbq_create_elem("test_bbq", BUF_CNT, sizeof(uint16_t), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW);
+ ASSERT_NE(q, nullptr);
EXPECT_LT(first_cnt, q->bn * q->bs);
// 批量入队(全部成功)
- // 推荐的函数,传入的数组地址连续,内部可以memcpy批量拷贝,速度快
- ret1 = bbq_enqueue_burst_value(q, (void const *)enq_table3, first_cnt);
+ ret1 = bbq_enqueue_burst_elem(q, (void const *)enq_table3, first_cnt, &wait_consumed);
EXPECT_EQ(ret1, first_cnt);
+ EXPECT_EQ(wait_consumed, ret1);
// 批量入队(部分成功)
- // 不推荐,但可用于特殊场景。由于需要将最终的值入队列,二维数组里的值不连续,需要循环赋值。
- ret2 = bbq_enqueue_burst_value_two_dimensional(q, (void *const *)enq_table2, second_cnt);
+ // 由于需要将最终的值入队列,二维数组里的值不连续,需要循环赋值。不推荐这个函数,但可用于特殊场景。
+ ret2 = bbq_enqueue_burst_elem_two_dimensional(q, (void *const *)enq_table2, second_cnt, &wait_consumed);
EXPECT_EQ(ret2, BUF_CNT - ret1);
+ EXPECT_EQ(wait_consumed, ret1 + ret2);
// 出队列(全部成功)
- ret1 = bbq_dequeue_burst_value(q, (void *)deq_table1, first_cnt);
+ ret1 = bbq_dequeue_burst_elem(q, (void *)deq_table1, first_cnt, &wait_consumed);
EXPECT_EQ(ret1, first_cnt);
+ EXPECT_EQ(wait_consumed, ret2);
// 出队列(部分成功)
- ret2 = bbq_dequeue_burst_value(q, (void *)deq_table2, second_cnt);
+ ret2 = bbq_dequeue_burst_elem(q, (void *)deq_table2, second_cnt, &wait_consumed);
EXPECT_EQ(ret2, BUF_CNT - ret1);
+ EXPECT_EQ(wait_consumed, 0);
// 验证数据
for (uint32_t i = 0; i < ret1; i++) {
@@ -282,43 +284,45 @@ TEST_F(bbq_example, burst_retry_new_cp_value) {
EXPECT_EQ(deq_table2[i], TEST_DATA_MAGIC) << "i :" << i;
}
- EXPECT_TRUE(bbq_check_array_bounds(q));
- bbq_ring_free(q);
+ EXPECT_TRUE(bbq_debug_check_array_bounds(q));
+ bbq_destory(q);
test_free(TEST_MODULE_DATA, deq_table2);
}
TEST_F(bbq_example, burst_retry_new_cp_pointer) {
- struct bbq_s *q;
+ struct bbq *q;
uint32_t ret1 = 0;
uint32_t ret2 = 0;
uint64_t first_cnt = 4000;
uint64_t second_cnt = 1000;
-
+ uint32_t wait_consumed = 0;
uint16_t *deq_table1[BUF_CNT] = {0};
uint16_t **deq_table2 = (uint16_t **)test_malloc(TEST_MODULE_DATA, sizeof(uint16_t *) * BUF_CNT);
// 创建队列
- q = bbq_create(BUF_CNT, sizeof(uint16_t), BBQ_CREATE_F_RETRY_NEW | BBQ_CREATE_F_COPY_PTR);
- EXPECT_TRUE(q);
+ q = bbq_create("test_bbq", BUF_CNT, BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW);
+ ASSERT_NE(q, nullptr);
EXPECT_LT(first_cnt, q->bn * q->bs);
// 批量入队(全部成功)
- // 推荐的函数,传入的数组地址连续,内部可以memcpy批量拷贝,速度快
- ret1 = bbq_enqueue_burst_ptr(q, (void *const *)enq_table1, first_cnt);
+ ret1 = bbq_enqueue_burst(q, (void *const *)enq_table1, first_cnt, &wait_consumed);
EXPECT_EQ(ret1, first_cnt);
+ EXPECT_EQ(wait_consumed, ret1);
// 批量入队(部分成功)
- // 不推荐,但可用于特殊场景。将数组成员的每个地址入队,需要循环取成员地址。
- ret2 = bbq_enqueue_burst_ptr_one_dimensional(q, (const void *)enq_table3, second_cnt);
+ ret2 = bbq_enqueue_burst(q, (void *const *)enq_table2, second_cnt, &wait_consumed);
EXPECT_EQ(ret2, BUF_CNT - ret1);
+ EXPECT_EQ(wait_consumed, ret1 + ret2);
// 出队列(全部成功)
- ret1 = bbq_dequeue_burst_ptr(q, (void **)deq_table1, first_cnt);
+ ret1 = bbq_dequeue_burst(q, (void **)deq_table1, first_cnt, &wait_consumed);
EXPECT_EQ(ret1, first_cnt);
+ EXPECT_EQ(wait_consumed, ret2);
// 出队列(部分成功)
- ret2 = bbq_dequeue_burst_ptr(q, (void **)deq_table2, second_cnt);
+ ret2 = bbq_dequeue_burst(q, (void **)deq_table2, second_cnt, &wait_consumed);
EXPECT_EQ(ret2, BUF_CNT - ret1);
+ EXPECT_EQ(wait_consumed, 0);
// 验证数据
for (uint32_t i = 0; i < ret1; i++) {
@@ -329,85 +333,98 @@ TEST_F(bbq_example, burst_retry_new_cp_pointer) {
EXPECT_EQ(*deq_table2[i], TEST_DATA_MAGIC) << "i :" << i;
}
- EXPECT_TRUE(bbq_check_array_bounds(q));
- bbq_ring_free(q);
+ EXPECT_TRUE(bbq_debug_check_array_bounds(q));
+ bbq_destory(q);
test_free(TEST_MODULE_DATA, deq_table2);
}
TEST_F(bbq_example, burst_drop_old_cp_pointer) {
- struct bbq_s *q;
+ struct bbq *q;
uint32_t ret1 = 0;
uint32_t ret2 = 0;
uint64_t first_cnt = BUF_CNT;
uint64_t second_cnt = 1000;
-
+ uint32_t wait_consumed = 0;
uint16_t *deq_table1[BUF_CNT] = {0};
uint16_t **deq_table2 = (uint16_t **)test_malloc(TEST_MODULE_DATA, sizeof(uint16_t *) * BUF_CNT);
// 创建队列
- q = bbq_create(BUF_CNT, sizeof(uint16_t), BBQ_CREATE_F_DROP_OLD | BBQ_CREATE_F_COPY_PTR);
- EXPECT_TRUE(q);
+ q = bbq_create("test_bbq", BUF_CNT, BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD);
+ ASSERT_NE(q, nullptr);
+ EXPECT_GT(second_cnt, q->bs);
EXPECT_LT(second_cnt, q->bs * q->bn);
- // 批量入队(全部成功)
- // 推荐的函数,传入的数组地址连续,内部可以memcpy批量拷贝,速度快
- ret1 = bbq_enqueue_burst_ptr(q, (void *const *)enq_table1, first_cnt);
+ // 批量入队(全部成功,入队个数等于队列总容量,未发生覆盖)
+ ret1 = bbq_enqueue_burst(q, (void *const *)enq_table1, first_cnt, &wait_consumed);
EXPECT_EQ(ret1, first_cnt);
+ EXPECT_EQ(wait_consumed, ret1);
// 批量入队(全部成功),覆盖了旧数据
- // 不推荐,但可用于特殊场景。将数组成员的每个地址入队,需要循环取成员地址。
- ret2 = bbq_enqueue_burst_ptr_one_dimensional(q, (const void *)enq_table3, second_cnt);
+ ret2 = bbq_enqueue_burst(q, (void *const *)enq_table2, second_cnt, &wait_consumed);
EXPECT_EQ(ret2, second_cnt);
+ EXPECT_EQ(wait_consumed, second_cnt - q->bs);
// 出队列(部分成功)
- // 一旦生产者追上了消费者,之前未消费的,以及当前块的数据全都作废了。
- ret1 = bbq_dequeue_burst_ptr(q, (void **)deq_table1, BUF_CNT);
+ // 一旦生产者追上了消费者,之前未消费的,以及当前块的数据全都作废了。本例中第一个完整块作废。
+ ret1 = bbq_dequeue_burst(q, (void **)deq_table1, BUF_CNT, &wait_consumed);
EXPECT_EQ(ret1, second_cnt - q->bs);
+ EXPECT_EQ(wait_consumed, 0);
// 验证数据
for (uint32_t i = 0; i < ret1; i++) {
EXPECT_EQ(*deq_table1[i], TEST_DATA_MAGIC) << "i :" << i;
}
- EXPECT_TRUE(bbq_check_array_bounds(q));
- bbq_ring_free(q);
+ // 此时生产者和消费者在同一块上,入队个数为队列容量的N倍,由于发生了覆盖,且依旧在同一块上,数据全作废
+ for (uint32_t loop = 0; loop < 3; loop++) {
+ ret1 = bbq_enqueue_burst(q, (void *const *)enq_table1, BUF_CNT, &wait_consumed);
+ EXPECT_EQ(ret1, BUF_CNT);
+ EXPECT_TRUE(bbq_empty(q));
+ EXPECT_EQ(wait_consumed, 0);
+ }
+
+ EXPECT_TRUE(bbq_debug_check_array_bounds(q));
+ bbq_destory(q);
test_free(TEST_MODULE_DATA, deq_table2);
}
TEST_F(bbq_example, burst_drop_old_cp_value) {
- struct bbq_s *q;
+ struct bbq *q;
uint32_t ret1 = 0;
uint32_t ret2 = 0;
uint64_t first_cnt = BUF_CNT;
uint64_t second_cnt = 1000;
-
+ uint32_t wait_consumed = 0;
uint16_t deq_table1[BUF_CNT] = {0};
// 创建队列
- q = bbq_create(BUF_CNT, sizeof(uint16_t), BBQ_CREATE_F_DROP_OLD | BBQ_CREATE_F_COPY_VALUE);
- EXPECT_TRUE(q);
+ q = bbq_create_elem("test_bbq", BUF_CNT, sizeof(uint16_t), BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD);
+ ASSERT_NE(q, nullptr);
+ EXPECT_GT(second_cnt, q->bs);
EXPECT_LT(second_cnt, q->bs * q->bn);
// 批量入队(全部成功)
- // 推荐的函数,传入的数组地址连续,内部可以memcpy批量拷贝,速度快
- ret1 = bbq_enqueue_burst_value(q, (void const *)enq_table3, first_cnt);
+ ret1 = bbq_enqueue_burst_elem(q, (void const *)enq_table3, first_cnt, &wait_consumed);
EXPECT_EQ(ret1, first_cnt);
+ EXPECT_EQ(wait_consumed, ret1);
// 批量入队(全部成功),覆盖了旧数据
- // 不推荐,但可用于特殊场景。将数组成员的每个地址入队,需要循环取成员地址。
- ret2 = bbq_enqueue_burst_value_two_dimensional(q, (void *const *)enq_table1, second_cnt);
+ // 由于需要将最终的值入队列,二维数组里的值不连续,需要循环赋值。不推荐这个函数,但可用于特殊场景。
+ ret2 = bbq_enqueue_burst_elem_two_dimensional(q, (void *const *)enq_table1, second_cnt, &wait_consumed);
EXPECT_EQ(ret2, second_cnt);
+ EXPECT_EQ(wait_consumed, second_cnt - q->bs);
// 出队列(部分成功)
// 一旦生产者追上了消费者,之前未消费的,以及当前块的数据全都作废了。
- ret1 = bbq_dequeue_burst_value(q, (void *)deq_table1, BUF_CNT);
+ ret1 = bbq_dequeue_burst_elem(q, (void *)deq_table1, BUF_CNT, &wait_consumed);
EXPECT_EQ(ret1, second_cnt - q->bs);
+ EXPECT_EQ(wait_consumed, 0);
// 验证数据
for (uint32_t i = 0; i < ret1; i++) {
EXPECT_EQ(deq_table1[i], TEST_DATA_MAGIC) << "i :" << i;
}
- EXPECT_TRUE(bbq_check_array_bounds(q));
- bbq_ring_free(q);
+ EXPECT_TRUE(bbq_debug_check_array_bounds(q));
+ bbq_destory(q);
} \ No newline at end of file
diff --git a/bbq/tests/unittest/ut_head_cursor.cc b/bbq/tests/unittest/ut_head_cursor.cc
index dd95e7e..b8bff37 100644
--- a/bbq/tests/unittest/ut_head_cursor.cc
+++ b/bbq/tests/unittest/ut_head_cursor.cc
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-17 11:15:19
+ * @LastEditTime: 2024-06-24 10:15:57
* @Describe: TODO
*/
@@ -9,10 +9,9 @@ extern "C" {
#include "test_queue.h"
#include "ut.h"
extern bool bbq_malloc_free_equal();
-extern void bbq_memory_print();
-extern bool bbq_check_array_bounds(struct bbq_s *q);
-extern struct bbq_s *bbq_ring_create_bnbs(uint32_t bn, uint32_t bs, size_t obj_size, unsigned int flags);
-extern struct bbq_s *bbq_ring_create_bnbs_with_socket(uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, unsigned int flags);
+extern void bbq_debug_memory_print();
+extern bool bbq_debug_check_array_bounds(struct bbq *q);
+extern struct bbq *bbq_create_bnbs_elem(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags);
}
class bbq_head_cursor : public testing::Test { // 继承了 testing::Test
@@ -31,45 +30,43 @@ class bbq_head_cursor : public testing::Test { // 继承了 testing::Test
}
};
-void expect_phead(struct bbq_s *q, uint64_t idx, uint64_t vsn, int line) {
+void expect_phead(struct bbq *q, uint64_t idx, uint64_t vsn, int line) {
EXPECT_EQ(bbq_idx(q, q->phead), idx) << "line: " << line;
EXPECT_EQ(bbq_head_vsn(q, q->phead), vsn) << "line: " << line;
}
-void expect_chead(struct bbq_s *q, uint64_t idx, uint64_t vsn, int line) {
+void expect_chead(struct bbq *q, uint64_t idx, uint64_t vsn, int line) {
EXPECT_EQ(bbq_idx(q, q->chead), idx) << "line: " << line;
EXPECT_EQ(bbq_head_vsn(q, q->chead), vsn) << "line: " << line;
}
-void expect_eq_allocated(struct bbq_s *q, bbq_block_s *block, uint64_t off, uint64_t vsn, int line) {
+void expect_eq_allocated(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) {
EXPECT_EQ(bbq_off(q, block->allocated), off) << "line: " << line;
EXPECT_EQ(bbq_cur_vsn(q, block->allocated), vsn) << "line: " << line;
}
-void expect_eq_committed(struct bbq_s *q, bbq_block_s *block, uint64_t off, uint64_t vsn, int line) {
+void expect_eq_committed(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) {
EXPECT_EQ(bbq_off(q, block->committed), off) << "line: " << line;
EXPECT_EQ(bbq_cur_vsn(q, block->committed), vsn) << "line: " << line;
}
-void expect_eq_consumed(struct bbq_s *q, bbq_block_s *block, uint64_t off, uint64_t vsn, int line) {
+void expect_eq_consumed(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) {
EXPECT_EQ(bbq_off(q, block->consumed), off) << "line: " << line;
EXPECT_EQ(bbq_cur_vsn(q, block->consumed), vsn) << "line: " << line;
}
-void expect_eq_reserved(struct bbq_s *q, bbq_block_s *block, uint64_t off, uint64_t vsn, int line) {
+void expect_eq_reserved(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) {
EXPECT_EQ(bbq_off(q, block->reserved), off) << "line: " << line;
EXPECT_EQ(bbq_cur_vsn(q, block->reserved), vsn) << "line: " << line;
}
// 初始化状态
TEST_F(bbq_head_cursor, init) {
- test_memory_counter_clear();
-
- struct bbq_s *q;
+ struct bbq *q;
uint32_t bn = 2;
uint32_t bs = 4;
- q = bbq_ring_create_bnbs(bn, bs, sizeof(int), BBQ_CREATE_F_COPY_VALUE | BBQ_CREATE_F_RETRY_NEW);
- EXPECT_TRUE(q);
+ q = bbq_create_bnbs_elem("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW);
+ ASSERT_NE(q, nullptr);
// 1.初始化状态,除了第一个block外其他块的4个游标都指向最后一个条目
EXPECT_EQ(q->phead, 0);
@@ -85,15 +82,15 @@ TEST_F(bbq_head_cursor, init) {
expect_eq_reserved(q, &q->blocks[i], bs, 0, __LINE__);
expect_eq_consumed(q, &q->blocks[i], bs, 0, __LINE__);
}
- EXPECT_TRUE(bbq_check_array_bounds(q));
- bbq_ring_free(q);
+ EXPECT_TRUE(bbq_debug_check_array_bounds(q));
+ bbq_destory(q);
EXPECT_TRUE(bbq_malloc_free_equal());
EXPECT_TRUE(test_malloc_free_equal());
}
void ut_produce_something(uint32_t produce_cnt) {
int ret = 0;
- struct bbq_s *q;
+ struct bbq *q;
uint32_t bn = 8;
uint32_t bs = 4096;
int enqueue_data = TEST_DATA_MAGIC;
@@ -102,12 +99,12 @@ void ut_produce_something(uint32_t produce_cnt) {
EXPECT_GT(produce_cnt, 0);
EXPECT_LE(produce_cnt, bs);
- q = bbq_ring_create_bnbs(bn, bs, sizeof(int), BBQ_CREATE_F_COPY_VALUE | BBQ_CREATE_F_RETRY_NEW);
- EXPECT_TRUE(q);
+ q = bbq_create_bnbs_elem("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW);
+ ASSERT_NE(q, nullptr);
// 生产produce_cnt
for (uint32_t i = 0; i < produce_cnt; i++) {
- ret = bbq_enqueue(q, &enqueue_data);
+ ret = bbq_enqueue_elem(q, &enqueue_data);
EXPECT_TRUE(ret == BBQ_OK);
}
@@ -120,7 +117,7 @@ void ut_produce_something(uint32_t produce_cnt) {
// 消费完
for (uint32_t i = 0; i < produce_cnt; i++) {
- ret = bbq_dequeue(q, &dequeue_data);
+ ret = bbq_dequeue_elem(q, &dequeue_data);
EXPECT_TRUE(ret == BBQ_OK);
EXPECT_EQ(dequeue_data, TEST_DATA_MAGIC);
}
@@ -138,13 +135,11 @@ void ut_produce_something(uint32_t produce_cnt) {
expect_eq_reserved(q, &q->blocks[i], bs, 0, __LINE__);
expect_eq_consumed(q, &q->blocks[i], bs, 0, __LINE__);
}
- EXPECT_TRUE(bbq_check_array_bounds(q));
- bbq_ring_free(q);
+ EXPECT_TRUE(bbq_debug_check_array_bounds(q));
+ bbq_destory(q);
}
// 在第一块内生产,然后被消费完
TEST_F(bbq_head_cursor, produce_something) {
- test_memory_counter_clear();
-
ut_produce_something(1);
ut_produce_something(567);
ut_produce_something(789);
@@ -155,7 +150,7 @@ TEST_F(bbq_head_cursor, produce_something) {
void ut_produce_next_block(uint32_t over) {
int ret = 0;
- struct bbq_s *q;
+ struct bbq *q;
uint32_t bn = 8;
uint32_t bs = 4096;
uint32_t produce_cnt = bs + over;
@@ -165,12 +160,12 @@ void ut_produce_next_block(uint32_t over) {
EXPECT_GT(over, 0);
EXPECT_LT(over, bs);
- q = bbq_ring_create_bnbs(bn, bs, sizeof(int), BBQ_CREATE_F_COPY_VALUE | BBQ_CREATE_F_RETRY_NEW);
- EXPECT_TRUE(q);
+ q = bbq_create_bnbs_elem("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW);
+ ASSERT_NE(q, nullptr);
// 生产至第二块的第一个entry
for (uint32_t i = 0; i < produce_cnt; i++) {
- ret = bbq_enqueue(q, &enqueue_data);
+ ret = bbq_enqueue_elem(q, &enqueue_data);
EXPECT_TRUE(ret == BBQ_OK);
}
@@ -188,7 +183,7 @@ void ut_produce_next_block(uint32_t over) {
// 消费完
for (uint32_t i = 0; i < produce_cnt; i++) {
- ret = bbq_dequeue(q, &dequeue_data);
+ ret = bbq_dequeue_elem(q, &dequeue_data);
EXPECT_TRUE(ret == BBQ_OK);
EXPECT_EQ(dequeue_data, TEST_DATA_MAGIC);
}
@@ -205,14 +200,12 @@ void ut_produce_next_block(uint32_t over) {
expect_eq_reserved(q, &q->blocks[1], over, 1, __LINE__);
expect_eq_consumed(q, &q->blocks[1], over, 1, __LINE__);
- EXPECT_TRUE(bbq_check_array_bounds(q));
- bbq_ring_free(q);
+ EXPECT_TRUE(bbq_debug_check_array_bounds(q));
+ bbq_destory(q);
}
// 第一块生产完毕,第二块生产了若干,然后被消费完
TEST_F(bbq_head_cursor, produce_next_block) {
- test_memory_counter_clear();
-
ut_produce_next_block(1);
ut_produce_next_block(123);
ut_produce_next_block(456);
@@ -223,26 +216,26 @@ TEST_F(bbq_head_cursor, produce_next_block) {
void ut_produce_all_loop(uint32_t loop) {
int ret = 0;
- struct bbq_s *q;
+ struct bbq *q;
uint32_t bn = 8;
uint32_t bs = 4096;
uint32_t produce_cnt = bn * bs;
int enqueue_data = TEST_DATA_MAGIC;
int dequeue_data = 0;
- q = bbq_ring_create_bnbs(bn, bs, sizeof(int), BBQ_CREATE_F_COPY_VALUE | BBQ_CREATE_F_RETRY_NEW);
- EXPECT_TRUE(q);
+ q = bbq_create_bnbs_elem("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW);
+ ASSERT_NE(q, nullptr);
for (uint32_t cnt = 0; cnt < loop; cnt++) {
// 所有entry生产完毕
for (uint32_t i = 0; i < produce_cnt; i++) {
- ret = bbq_enqueue(q, &enqueue_data);
+ ret = bbq_enqueue_elem(q, &enqueue_data);
EXPECT_TRUE(ret == BBQ_OK);
}
// 消费完
for (uint32_t i = 0; i < produce_cnt; i++) {
- ret = bbq_dequeue(q, &dequeue_data);
+ ret = bbq_dequeue_elem(q, &dequeue_data);
EXPECT_TRUE(ret == BBQ_OK);
EXPECT_EQ(dequeue_data, TEST_DATA_MAGIC);
}
@@ -263,14 +256,12 @@ void ut_produce_all_loop(uint32_t loop) {
expect_eq_consumed(q, &q->blocks[i], bs, loop, __LINE__);
}
- EXPECT_TRUE(bbq_check_array_bounds(q));
- bbq_ring_free(q);
+ EXPECT_TRUE(bbq_debug_check_array_bounds(q));
+ bbq_destory(q);
}
// 完成多轮的满生产和满消费
TEST_F(bbq_head_cursor, produce_all_loop) {
- test_memory_counter_clear();
-
ut_produce_all_loop(1);
ut_produce_all_loop(10);
ut_produce_all_loop(23);
@@ -280,30 +271,31 @@ TEST_F(bbq_head_cursor, produce_all_loop) {
}
TEST_F(bbq_head_cursor, retry_new_full_empty) {
- test_memory_counter_clear();
int ret = 0;
uint32_t entries_cnt = 4096;
uint32_t loop = 1000;
- struct bbq_s *q;
+ struct bbq *q;
int *data = (int *)test_malloc(TEST_MODULE_UTEST, sizeof(*data) * entries_cnt);
int tmp_data = 0;
EXPECT_TRUE(data);
- q = bbq_create(entries_cnt, sizeof(int), BBQ_CREATE_F_COPY_VALUE | BBQ_CREATE_F_RETRY_NEW);
- EXPECT_TRUE(q);
+ q = bbq_create_elem("test_bbq", entries_cnt, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW);
+ ASSERT_NE(q, nullptr);
+ EXPECT_TRUE(bbq_empty(q));
for (uint32_t i = 0; i < loop; i++) {
// 入满队
for (uint32_t j = 0; j < entries_cnt; j++) {
data[j] = (i + 1) * j;
- ret = bbq_enqueue(q, &data[j]);
+ ret = bbq_enqueue_elem(q, &data[j]);
EXPECT_TRUE(ret == BBQ_OK) << "ret " << ret;
+ EXPECT_FALSE(bbq_empty(q));
}
// 满队再入队
for (uint32_t j = 0; j < entries_cnt / 3; j++) {
- ret = bbq_enqueue(q, &data[j]);
+ ret = bbq_enqueue_elem(q, &data[j]);
EXPECT_TRUE(ret == BBQ_QUEUE_FULL);
}
@@ -319,14 +311,16 @@ TEST_F(bbq_head_cursor, retry_new_full_empty) {
// 全出队
for (uint32_t j = 0; j < entries_cnt; j++) {
- ret = bbq_dequeue(q, &tmp_data);
+ EXPECT_FALSE(bbq_empty(q));
+ ret = bbq_dequeue_elem(q, &tmp_data);
EXPECT_TRUE(ret == BBQ_OK);
EXPECT_EQ(tmp_data, data[j]);
}
+ EXPECT_TRUE(bbq_empty(q));
// 空出队再出队
for (uint32_t j = 0; j < entries_cnt / 2; j++) {
- ret = bbq_dequeue(q, &tmp_data);
+ ret = bbq_dequeue_elem(q, &tmp_data);
EXPECT_TRUE(ret == BBQ_QUEUE_EMPTY);
}
@@ -340,15 +334,13 @@ TEST_F(bbq_head_cursor, retry_new_full_empty) {
}
test_free(TEST_MODULE_UTEST, data);
- EXPECT_TRUE(bbq_check_array_bounds(q));
- bbq_ring_free(q);
+ EXPECT_TRUE(bbq_debug_check_array_bounds(q));
+ bbq_destory(q);
EXPECT_TRUE(bbq_malloc_free_equal());
EXPECT_TRUE(test_malloc_free_equal());
}
TEST_F(bbq_head_cursor, mpsc_faa) {
- test_memory_counter_clear();
-
test_info_s test_info = {
.cfg = {
.base = {
@@ -408,41 +400,44 @@ TEST_F(bbq_head_cursor, mpsc_faa) {
}
test_free(TEST_MODULE_UTEST, exit_data);
test_threads_destory(&test_info, threads);
- EXPECT_TRUE(bbq_check_array_bounds((struct bbq_s *)q.ring));
+ EXPECT_TRUE(bbq_debug_check_array_bounds((struct bbq *)q.ring));
test_queue_destory(&q);
EXPECT_TRUE(bbq_malloc_free_equal());
EXPECT_TRUE(test_malloc_free_equal());
}
-TEST_F(bbq_head_cursor, drop_old_full_empty1) {
- test_memory_counter_clear();
+TEST_F(bbq_head_cursor, drop_old_full_empty) {
int ret = 0;
uint32_t bn = 2;
uint32_t bs = 4;
uint32_t loop = 1000;
- struct bbq_s *q;
+ struct bbq *q;
int tmp_data = 0;
- q = bbq_ring_create_bnbs(bn, bs, sizeof(int), BBQ_CREATE_F_COPY_VALUE | BBQ_CREATE_F_DROP_OLD);
- EXPECT_TRUE(q);
+ q = bbq_create_bnbs_elem("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD);
+ ASSERT_NE(q, nullptr);
+ EXPECT_TRUE(bbq_empty(q));
for (uint32_t j = 0; j < loop; j++) {
// 入满队列
for (uint32_t i = 0; i < bn * bs; i++) {
- ret = bbq_enqueue(q, &i);
+ ret = bbq_enqueue_elem(q, &i);
EXPECT_TRUE(ret == BBQ_OK) << "ret " << ret;
+ EXPECT_FALSE(bbq_empty(q));
}
// 全出队
for (uint32_t i = 0; i < bn * bs; i++) {
- ret = bbq_dequeue(q, &tmp_data);
+ EXPECT_FALSE(bbq_empty(q));
+ ret = bbq_dequeue_elem(q, &tmp_data);
EXPECT_TRUE(ret == BBQ_OK) << "ret " << ret;
EXPECT_EQ(tmp_data, i);
}
+ EXPECT_TRUE(bbq_empty(q));
// 空队再出队,失败
for (uint32_t i = 0; i < bn * bs; i++) {
- ret = bbq_dequeue(q, &tmp_data);
+ ret = bbq_dequeue_elem(q, &tmp_data);
EXPECT_TRUE(ret == BBQ_QUEUE_EMPTY) << "ret " << ret;
}
@@ -455,32 +450,41 @@ TEST_F(bbq_head_cursor, drop_old_full_empty1) {
EXPECT_EQ(q->blocks[i].consumed.load(), 0);
}
}
- EXPECT_TRUE(bbq_check_array_bounds(q));
- bbq_ring_free(q);
+ EXPECT_TRUE(bbq_debug_check_array_bounds(q));
+ bbq_destory(q);
EXPECT_TRUE(bbq_malloc_free_equal());
EXPECT_TRUE(test_malloc_free_equal());
}
-TEST_F(bbq_head_cursor, drop_old_full_empty2) {
- test_memory_counter_clear();
+TEST_F(bbq_head_cursor, drop_old_full_empty_cover) {
int ret = 0;
uint32_t bn = 2;
uint32_t bs = 4;
uint32_t loop = 1000;
uint32_t over_cnt = bs + 2;
- struct bbq_s *q;
+ struct bbq *q;
EXPECT_EQ(over_cnt / bs, 1);
int tmp_data = 0;
- q = bbq_ring_create_bnbs(bn, bs, sizeof(int), BBQ_CREATE_F_COPY_VALUE | BBQ_CREATE_F_DROP_OLD);
- EXPECT_TRUE(q);
+ q = bbq_create_bnbs_elem("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD);
+ ASSERT_NE(q, nullptr);
+ EXPECT_TRUE(bbq_empty(q));
// 入满队列,再入over_cnt
for (uint32_t i = 0; i < bn * bs * loop + over_cnt; i++) {
- ret = bbq_enqueue(q, &i);
+ ret = bbq_enqueue_elem(q, &i);
EXPECT_TRUE(ret == BBQ_OK) << "ret " << ret;
+
+ uint32_t tmpA = i / (bn * bs);
+ uint32_t tmpB = i % (bn * bs);
+ if (tmpA > 0 && (tmpB < bs)) {
+ // 覆盖第一个块时,整个块被作废,因此都是empty,从第二个块开始可读取
+ EXPECT_TRUE(bbq_empty(q)) << "i " << i << "tmpA " << tmpA << "tmpB " << tmpB;
+ } else {
+ EXPECT_FALSE(bbq_empty(q));
+ }
}
expect_phead(q, 1, loop, __LINE__);
@@ -503,12 +507,13 @@ TEST_F(bbq_head_cursor, drop_old_full_empty2) {
// 队列中的数据全出队
for (uint32_t i = 0; i < over_cnt - bs; i++) {
- ret = bbq_dequeue(q, &tmp_data);
+ ret = bbq_dequeue_elem(q, &tmp_data);
EXPECT_TRUE(ret == BBQ_OK) << "ret " << ret;
}
for (uint32_t i = 0; i < bn * bs; i++) {
- ret = bbq_dequeue(q, &tmp_data);
+ EXPECT_TRUE(bbq_empty(q));
+ ret = bbq_dequeue_elem(q, &tmp_data);
EXPECT_TRUE(ret == BBQ_QUEUE_EMPTY) << "ret " << ret;
}
@@ -527,8 +532,9 @@ TEST_F(bbq_head_cursor, drop_old_full_empty2) {
i == 1 ? loop + 1 : 0, __LINE__);
EXPECT_EQ(q->blocks[i].consumed.load(), 0);
}
- EXPECT_TRUE(bbq_check_array_bounds(q));
- bbq_ring_free(q);
+
+ EXPECT_TRUE(bbq_debug_check_array_bounds(q));
+ bbq_destory(q);
EXPECT_TRUE(bbq_malloc_free_equal());
EXPECT_TRUE(test_malloc_free_equal());
} \ No newline at end of file
diff --git a/bbq/tests/unittest/ut_mix.cc b/bbq/tests/unittest/ut_mix.cc
index cb7c12a..daf6c97 100644
--- a/bbq/tests/unittest/ut_mix.cc
+++ b/bbq/tests/unittest/ut_mix.cc
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-17 11:14:19
+ * @LastEditTime: 2024-06-19 22:49:24
* @Describe: bbq除了队列操作外,其他函数的测试
*/
@@ -53,8 +53,6 @@ void *fetch_max_thread_func(void *arg) {
}
TEST_F(bbq_mix, fetch_max) {
- test_memory_counter_clear();
-
uint64_t ret = 0;
ut_fetch_arg arg = {};
arg.data.store(1); // 初始化1
@@ -87,8 +85,6 @@ TEST_F(bbq_mix, fetch_max) {
}
TEST_F(bbq_mix, power_of_two) {
- test_memory_counter_clear();
-
uint32_t tmp = 0;
uint32_t max = pow(2, 32) - 1;
@@ -114,8 +110,6 @@ TEST_F(bbq_mix, power_of_two) {
}
TEST_F(bbq_mix, bbq_blocks_calc) {
- test_memory_counter_clear();
-
uint32_t tmp = 0;
uint32_t max = pow(2, 32) - 1;
@@ -148,8 +142,6 @@ TEST_F(bbq_mix, bbq_blocks_calc) {
}
TEST_F(bbq_mix, ceil_log2) {
- test_memory_counter_clear();
-
uint32_t tmp = 0;
uint32_t max = pow(2, 32) - 1;
diff --git a/bbq/tests/unittest/ut_multit.cc b/bbq/tests/unittest/ut_multit.cc
index 8ab7276..0d29b1c 100644
--- a/bbq/tests/unittest/ut_multit.cc
+++ b/bbq/tests/unittest/ut_multit.cc
@@ -12,8 +12,8 @@ extern "C" {
#include "ut.h"
extern bool bbq_malloc_free_equal();
extern bool test_malloc_free_equal();
-extern void bbq_memory_print();
-bool bbq_check_array_bounds(struct bbq_s *q);
+extern void bbq_debug_memory_print();
+bool bbq_debug_check_array_bounds(struct bbq *q);
}
class multit : public testing::Test { // 继承了 testing::Test
@@ -89,6 +89,6 @@ TEST_F(multit, mpmc) {
}
test_free(TEST_MODULE_UTEST, exit_data);
test_threads_destory(&test_info, threads);
- EXPECT_TRUE(bbq_check_array_bounds((struct bbq_s *)q.ring));
+ EXPECT_TRUE(bbq_debug_check_array_bounds((struct bbq *)q.ring));
test_queue_destory(&q);
}
diff --git a/perf/CMakeLists.txt b/perf/CMakeLists.txt
index 54801b1..40e8326 100644
--- a/perf/CMakeLists.txt
+++ b/perf/CMakeLists.txt
@@ -7,7 +7,7 @@ include_directories(
${CMAKE_CURRENT_SOURCE_DIR}/../bbq/tests/common
${CMAKE_CURRENT_SOURCE_DIR}/thirdparty/iniparser
${CMAKE_CURRENT_SOURCE_DIR}/thirdparty/rmind_ringbuf
- /home/admin/test/dpdk-23.07/build/install/include
+ /root/code/c/dpdk-21.11.4/install/include
)
# 将bbq单元测试里的公共文件,添加到perf里。
@@ -34,7 +34,7 @@ set(EXEC_PATH ${OUTPUT_DIR}/bin)
# 指定库路径
link_directories(${LIB_PATH})
link_directories(../bbq/build/output/lib/)
-link_directories(/home/admin/test/dpdk-23.07/build/install/lib64)
+link_directories(/root/code/c/dpdk-21.11.4/install/lib64 /root/code/c/dpdk-21.11.4/install/lib64/dpdk/pmds-22.0)
# 可执行程序的名字
set(BENCHMARK_NAME benchmark)
diff --git a/perf/benchmark/bcm_benchmark.c b/perf/benchmark/bcm_benchmark.c
index 923fa1f..26d75de 100644
--- a/perf/benchmark/bcm_benchmark.c
+++ b/perf/benchmark/bcm_benchmark.c
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-14 17:34:15
+ * @LastEditTime: 2024-06-18 18:20:02
* @Describe: TODO
*/
@@ -16,7 +16,7 @@
#include <sys/prctl.h>
#include <unistd.h>
-extern void bbq_memory_print();
+extern void bbq_debug_memory_print();
extern bool bbq_malloc_free_equal();
void bcm_report_printf(test_cfg *cfg, test_merge_data *data, test_exit_data **raw_data, uint32_t thread_cnt, test_thread_type_e ttype) {
@@ -102,7 +102,7 @@ int main(int argc, char *argv[]) {
}
} else {
config = "/root/code/c/bbq-ly/perf/benchmark/config/compare/case1_simple_spsc.ini";
- ring_type = "rmind";
+ ring_type = "bbq";
burst_cnt = 16;
TEST_ERR_LOG("use default config, ringt_type:%s burst:%u config:%s argc:%d", ring_type, burst_cnt, config, argc);
}
@@ -151,7 +151,7 @@ int main(int argc, char *argv[]) {
test_free(TEST_MODULE_BCM, exit_data);
test_threads_destory(&test_info, threads);
test_queue_destory(&q);
- bbq_memory_print();
+ bbq_debug_memory_print();
test_memory_counter_print();
return 0;
diff --git a/perf/benchmark/bcm_queue.c b/perf/benchmark/bcm_queue.c
index 0cd09e8..f6bce9d 100644
--- a/perf/benchmark/bcm_queue.c
+++ b/perf/benchmark/bcm_queue.c
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-14 17:31:56
+ * @LastEditTime: 2024-06-21 18:07:20
* @Describe: TODO
*/
@@ -8,14 +8,14 @@
#include "ringbuf.h"
static __rte_always_inline unsigned int
-bcm_dpdk_ring_enqueue_burst(struct rte_ring *r, void **obj_table, uint32_t n, uint16_t thread_idx) {
+bcm_dpdk_ring_enqueue_burst(struct rte_ring *r, void **obj_table, uint32_t n, uint16_t thread_idx, uint32_t *wait_consumed) {
TEST_AVOID_WARNING(thread_idx);
- return rte_ring_enqueue_burst(r, (void *const *)obj_table, n, NULL);
-}
+ unsigned int free_space = 0;
+ int ret = 0;
-static __rte_always_inline unsigned int
-bcm_dpdk_ring_dequeue_burst(struct rte_ring *r, void *obj_table, unsigned int n) {
- return rte_ring_dequeue_burst(r, (void **)obj_table, n, NULL);
+ ret = rte_ring_enqueue_burst(r, (void *const *)obj_table, n, &free_space);
+ *wait_consumed = r->size - free_space - 1;
+ return ret;
}
int test_queue_init_dpdk(test_cfg *cfg, test_queue_s *q) {
@@ -40,7 +40,7 @@ int test_queue_init_dpdk(test_cfg *cfg, test_queue_s *q) {
flags |= RING_F_MC_RTS_DEQ;
}
- q->ring = (void *)rte_ring_create("dpdk_ring", cfg->ring.entries_cnt, rte_socket_id(), RING_F_MP_RTS_ENQ | RING_F_MC_RTS_DEQ);
+ q->ring = (void *)rte_ring_create("dpdk_ring", cfg->ring.entries_cnt, rte_socket_id(), flags);
if (q->ring == NULL) {
return BBQ_NULL_PTR;
}
@@ -49,7 +49,7 @@ int test_queue_init_dpdk(test_cfg *cfg, test_queue_s *q) {
q->enqueue_f = (test_ring_enqueue_f)rte_ring_enqueue;
q->dequeue_f = (test_ring_dequeue_f)rte_ring_dequeue;
q->enqueue_burst_f = (test_enqueue_burst_f)bcm_dpdk_ring_enqueue_burst;
- q->dequeue_burst_f = (test_dequeue_burst_f)bcm_dpdk_ring_dequeue_burst;
+ q->dequeue_burst_f = (test_dequeue_burst_f)rte_ring_dequeue_burst;
return BBQ_OK;
}
@@ -68,7 +68,8 @@ void test_queue_free_rmind(void *ring) {
test_free(TEST_MODULE_RMIND, ring);
}
-uint32_t test_enqueue_burst_rmind(void *ring, void **obj_table, uint32_t n, uint16_t thread_idx) {
+uint32_t test_enqueue_burst_rmind(void *ring, void **obj_table, uint32_t n, uint16_t thread_idx, uint32_t *wait_consumed) {
+ TEST_AVOID_WARNING(wait_consumed);
uint32_t cnt = 0;
int ret = 0;
size_t off = 0;
@@ -92,8 +93,9 @@ uint32_t test_enqueue_burst_rmind(void *ring, void **obj_table, uint32_t n, uint
return cnt;
}
-uint32_t test_dequeue_burst_rmind(void *ring, void *obj_table, uint32_t n) {
+uint32_t test_dequeue_burst_rmind(void *ring, void *obj_table, uint32_t n, uint32_t *wait_consumed) {
TEST_AVOID_WARNING(n);
+ TEST_AVOID_WARNING(wait_consumed);
size_t len = 0;
size_t off = 0;
size_t per_size = sizeof(void *);