summaryrefslogtreecommitdiff
path: root/bbq
diff options
context:
space:
mode:
authorliuyu <[email protected]>2024-07-07 22:55:37 -0400
committerliuyu <[email protected]>2024-07-07 22:55:37 -0400
commitee1cbf37fc0c08895ed70723029bfbce5f68c060 (patch)
treec6437570be6a0b9e2fa4797dbcb158eb260766db /bbq
parentd122b40e7633d24ea832175a8339324c9f43beaa (diff)
The first releaseHEADmaindev-utdev
Diffstat (limited to 'bbq')
-rw-r--r--bbq/CMakeLists.txt41
-rw-r--r--bbq/include/bbq.h360
-rw-r--r--bbq/src/CMakeLists.txt10
-rw-r--r--bbq/src/bbq.c1077
-rw-r--r--bbq/unittest/CMakeLists.txt25
-rw-r--r--bbq/unittest/ut_bbq.cc1189
-rw-r--r--bbq/unittest/ut_bbq_func.c615
-rw-r--r--bbq/unittest/ut_bbq_func.h302
8 files changed, 3619 insertions, 0 deletions
diff --git a/bbq/CMakeLists.txt b/bbq/CMakeLists.txt
new file mode 100644
index 0000000..a86ea7f
--- /dev/null
+++ b/bbq/CMakeLists.txt
@@ -0,0 +1,41 @@
+cmake_minimum_required(VERSION 3.0)
+project(BBQ)
+
+# 头文件目录
+include_directories(
+ ${CMAKE_CURRENT_SOURCE_DIR}/include
+)
+
+# 设置输出目录
+if(NOT DEFINED OUTPUT_DIR)
+ # 如果没有被设置,则设置一个默认值
+ SET(OUTPUT_DIR ${PROJECT_SOURCE_DIR}/build/output)
+endif()
+
+# 设置编译类型,默认Release
+add_compile_options(-Wall -Wextra)
+
+if(NOT CMAKE_BUILD_TYPE)
+ set(CMAKE_BUILD_TYPE Release)
+endif()
+
+if(CMAKE_BUILD_TYPE STREQUAL "Debug")
+ add_definitions(-DBBQ_MEMORY)
+endif()
+add_definitions(-D_GNU_SOURCE)
+
+# 库生成的路径
+set(LIB_PATH ${OUTPUT_DIR}/lib)
+# 测试程序生成的路径
+set(EXEC_PATH ${OUTPUT_DIR}/bin)
+
+# 静态库的名字
+set(BBQ_LIB bbq)
+# 可执行程序的名字
+set(TESTS_NAME tests)
+
+enable_testing() # 开启测试,否则无法执行make test
+
+# 添加子目录
+add_subdirectory(src)
+add_subdirectory(unittest) \ No newline at end of file
diff --git a/bbq/include/bbq.h b/bbq/include/bbq.h
new file mode 100644
index 0000000..9090848
--- /dev/null
+++ b/bbq/include/bbq.h
@@ -0,0 +1,360 @@
+/*
+ * @Author: [email protected]
+ * @LastEditTime: 2024-07-07 15:22:47
+ * @Describe: bbq(Block-based Bounded Queue)头文件
+ * 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf
+ */
+
+#pragma once
+
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdlib.h>
+
+#ifndef __cplusplus
+// C
+#include <stdatomic.h>
+typedef atomic_uint_fast64_t aotmic_uint64;
+#else
+// C++ 为了兼容gtest测试
+using aotmic_uint64 = std::atomic<uint64_t>;
+#endif
+
+#define BBQ_SOCKET_ID_ANY -1
+#define BBQ_SYMBOL_MAX 64
+#define BBQ_CACHE_LINE 64
+#define __BBQ_CACHE_ALIGNED __attribute__((__aligned__(BBQ_CACHE_LINE)))
+
+union bbq_atomic64 {
+ volatile uint64_t s; // single使用该字段
+ aotmic_uint64 m;
+};
+
+struct bbq_head {
+ union bbq_atomic64 value;
+} __BBQ_CACHE_ALIGNED;
+
+struct bbq_block {
+ union bbq_atomic64 committed; // 生产者,已提交(version|offset)
+ union bbq_atomic64 allocated; // 生产者,已分配(version|offset)
+ union bbq_atomic64 reserved; // 消费者,已预留(version|offset)
+ union bbq_atomic64 consumed; // 消费者,已消费(version|offset)注:在drop-old模式下没用到
+ char *entries __BBQ_CACHE_ALIGNED; // 存储大小可变的entry,每个块分配空间:bs * entry_size
+} __BBQ_CACHE_ALIGNED;
+
+typedef void *(*bbq_malloc_f)(int32_t socket_id, size_t size);
+typedef void (*bbq_free_f)(void *ptr, size_t size);
+
+struct bbq_mempool {
+ char *ptr; // 内存池起始地址
+ size_t off; // 已使用的偏移大小
+ size_t size; // 内存池总大小
+
+ bbq_malloc_f malloc_f; // 申请内存的函数,默认为malloc
+ bbq_free_f free_f; // 申请内存的函数,默认为free
+} __BBQ_CACHE_ALIGNED;
+
+struct bbq {
+ // cache line-1
+ char name[BBQ_SYMBOL_MAX] __BBQ_CACHE_ALIGNED;
+
+ // cache line-2
+ int32_t socket_id; // 用于libnuma分配内存,socket_id小于0将使用malloc分配
+ uint32_t bn; // blocks的个数
+ uint32_t bs; // blocks.entries的个数
+ uint32_t flags; // 标记:retry new 模式,还是drop old模式
+ uint32_t idx_bits; // bbq_head里idx所占的位数
+ uint32_t off_bits; // bbq_cursor里offset所占的位数
+ uint64_t idx_mask; // idx_bits偏移后的掩码
+ uint64_t off_mask; // off_bits偏移后的掩码
+ uint64_t entry_size; // blocks.entries里每个entry的大小
+ bool prod_single; // 如果为单生产者或单消费者,则single为true
+ bool cons_single; // 如果为单生产者或单消费者,则single为true
+
+ // cache line-3
+ struct bbq_head phead; // 生产者头,指向块的索引,分为两部分:version|idx
+
+ // cache line-4
+ struct bbq_head chead; // 消费者头,指向块的索引,分为两部分:version|idx
+
+ // cache line-5
+ struct {
+ union bbq_atomic64 n_enq;
+ union bbq_atomic64 n_deq;
+ } __BBQ_CACHE_ALIGNED stat;
+
+ // cache line-6
+ struct bbq_mempool memory_pool; // 仅在初始化和调试时会读写
+ struct bbq_block *blocks; // bn大小的数组
+} __BBQ_CACHE_ALIGNED;
+
+#define BBQ_F_DEFAULT 0x0
+
+#define BBQ_F_DROP_OLD 0x0002 /**< 创建队列时设置为drop old模式(队列满时,入队成功并覆盖旧数据) */
+#define BBQ_F_RETRY_NEW BBQ_F_DEFAULT /**< 创建队列时设置为retry new模式(队列满时,当前入队失败) */
+
+#define BBQ_F_SP_ENQ 0x0004
+#define BBQ_F_MP_ENQ BBQ_F_DEFAULT
+#define BBQ_F_SC_DEQ 0x0008
+#define BBQ_F_MC_DEQ BBQ_F_DEFAULT
+#define BBQ_F_ENABLE_STAT 0x0010
+#define BBQ_F_DISABLE_STAT BBQ_F_DEFAULT
+
+/**
+ * 创建bbq队列,使用当前函数创建的队列,后续操作会把指针入队。
+ * 对应入队函数:bbq_enqueue、bbq_enqueue_burst
+ * 对应出队函数:bbq_dequeue、bbq_dequeue_burst
+ *
+ * @param[in] name
+ * 队列名称
+ * @param[in] count
+ * 队列大小,参数必须大于1,且是2的N次方。
+ * @param[in] socket_id
+ * 多numa架构下,针对指定socket分配内存。
+ * @param[in] flags
+ * 设置入队策略:
+ * - BBQ_F_DROP_OLD:队列满时,覆盖旧数据,入队成功
+ * - BBQ_F_RETRY_NEW:队列满了当前入队失败(默认)。
+ * 设置生产者模式:
+ * - BBQ_F_SP_ENQ:单生产者
+ * - BBQ_F_MP_ENQ:多生产者(默认)
+ * 设置消费者模式:
+ * - BBQ_F_SC_DEQ:单消费者
+ * - BBQ_F_MC_DEQ:多消费者(默认)
+ * 设置统计功能:
+ * 在出入队的时候同时累计成功次数,并推算出当前队列的剩余个数。注:目前仅retry new模式下支持统计功能
+ * - BBQ_F_ENABLE_STAT:开启统计功能
+ * - BBQ_F_DISABLE_STAT:关闭统计功能(默认)
+ * @return
+ * 非NULL:消息队列结构体指针,用于后续出队入队等操作。
+ * NULL:创建失败,可能存在的原因:
+ * - name或count参数超出范围
+ * - 申请内存失败
+ * - count不为2的n次方
+ * - name传入空指针
+ * - drop old模式下不支持
+ */
+extern struct bbq *bbq_create(const char *name, uint32_t count, int socket_id, uint32_t flags,
+ bbq_malloc_f malloc_f, bbq_free_f free_f);
+
+/**
+ * 消息队列单个指针入队
+ *
+ * @param[in] q
+ * 队列指针
+ * @param[in] data
+ * 指向入队指针的指针,如:
+ * int *data = malloc(sizeof(int));*data = TEST_DATA; 传入&data
+ * @return
+ * 成功返回0,失败返回小于0的错误码:
+ * - BBQ_ERR_INPUT_NULL:传入空指针
+ * - BBQ_ERR_FULL:队列已满
+ * - BBQ_ERR_BUSY:队列忙碌中
+ * - BBQ_ERR:其它错误
+ */
+extern int bbq_enqueue(struct bbq *q, void *const *data);
+
+/**
+ * 消息队列单个指针出队
+ *
+ * @param[in] q
+ * 队列指针
+ * @param[out] data
+ * 传入二级指针,如:
+ * int *data = NULL; 传入&data
+ * @return
+ * 成功返回0,失败返回小于0的错误码:
+ * - BBQ_ERR_INPUT_NULL:传入空指针
+ * - BBQ_ERR_EMPTY:队列已空
+ * - BBQ_ERR_BUSY:队列忙碌中
+ * - BBQ_ERR:其它错误
+ */
+extern int bbq_dequeue(struct bbq *q, void **data);
+
+/**
+ * 消息队列批量入队(指针入队),尽可能一次入队n个指针,返回实际成功入队个数
+ *
+ * @param[in] q
+ * 队列指针
+ * @param[in] obj_table
+ * 即将入队的指针数组,如:
+ * uint16_t **obj_table = malloc(sizeof(uint16_t **) * BUF_CNT);
+ * for(int i=0;i<BUF_CNT;i++){
+ * obj_table[i] = malloc(sizeof(uint16_t));
+ * obj_table[i] = TEST_DATA;
+ * }
+ * 传入obj_table
+ * @param[in] n
+ * 尝试一次入队的个数
+ * @param[out] wait_consumed
+ * 如果为非NULL,返回当前队列剩余的个数。注:该赋值可能会带来些许的性能损耗。
+ * @return
+ * 返回实际成功入队的个数。如果始终返回0,可能存在的错误原因:
+ * - 传入空指针
+ * - 队列已满
+ * - 队列忙碌中
+ */
+extern uint32_t bbq_enqueue_burst(struct bbq *q, void *const *obj_table, uint32_t n, uint32_t *wait_consumed);
+
+/**
+ * 消息队列批量指针出队,尽可能一次出队n个数据,返回实际成功出队个数
+ *
+ * @param[in] q
+ * 队列指针
+ * @param[out] obj_table
+ * 用于存储出队的指针,如:
+ * uint16_t **obj_table = malloc(sizeof(uint16_t *)); 传入obj_table
+ * @param[in] n
+ * 尝试一次出队的个数
+ * @param[out] wait_consumed
+ * 如果为非NULL,返回当前队列中,已入队的个数。注:该赋值可能会带来些许的性能损耗
+ * @return
+ * 返回实际出队的个数,如果始终返回0,可能存在的原因:
+ * - 传入空指针
+ * - 队列已空
+ * - 队列忙碌中
+ */
+extern uint32_t bbq_dequeue_burst(struct bbq *q, void **obj_table, uint32_t n, uint32_t *wait_consumed);
+
+/**
+ * 创建bbq队列,使用当前函数创建的队列,在后续操作会把指针指向的数据拷贝入队。
+ * 对应入队函数:bbq_enqueue_elem、bbq_enqueue_burst_elem
+ * 对应出队函数:bbq_dequeue_elem、bbq_dequeue_burst_elem
+ *
+ * @param[in] name
+ * 队列名称
+ * @param[in] count
+ * 队列大小,参数必须大于1,且是2的N次方。
+ * @param[in] socket_id
+ * 多numa架构下,针对指定socket分配内存。
+ * @param[in] flags
+ * 设置入队策略:
+ * - BBQ_F_DROP_OLD:队列满时,覆盖旧数据,入队成功
+ * - BBQ_F_RETRY_NEW:队列满了当前入队失败(默认)。
+ * 设置生产者模式:
+ * - BBQ_F_SP_ENQ:单生产者
+ * - BBQ_F_MP_ENQ:多生产者(默认)
+ * 设置消费者模式:
+ * - BBQ_F_SC_DEQ:单消费者
+ * - BBQ_F_MC_DEQ:多消费者(默认)
+ * 设置统计功能:
+ * 在出入队的时候同时累计成功次数,并推算出当前队列的剩余个数。注:目前仅retry new模式下支持统计功能
+ * - BBQ_F_ENABLE_STAT:开启统计功能
+ * - BBQ_F_DISABLE_STAT:关闭统计功能(默认)
+ * @return
+ * 非NULL:消息队列结构体指针,用于后续出队入队等操作。
+ * NULL:创建失败。可能存在的错误原因:
+ * - name或count参数超出范围
+ * - 申请内存失败
+ * - count不为2的n次方
+ * - name传入空指针
+ * - drop old模式下不支持
+ */
+extern struct bbq *bbq_create_elem(const char *name, uint32_t count, size_t obj_size,
+ int socket_id, uint32_t flags,
+ bbq_malloc_f malloc_f, bbq_free_f free_f);
+
+/**
+ * 消息队列单个数据入队(指针指向的数据将被拷贝)
+ *
+ * @param[in] q
+ * 队列指针
+ * @param[in] data
+ * 传入一级指针,如:int data = 1; 传入&data
+ * @return
+ * 成功返回0,失败返回小于0的错误码:
+ * - BBQ_ERR_INPUT_NULL:传入空指针
+ * - BBQ_ERR_FULL:队列已满
+ * - BBQ_ERR_BUSY:队列忙碌中
+ * - BBQ_ERR:其它错误
+ */
+extern int bbq_enqueue_elem(struct bbq *q, void const *data);
+
+/**
+ * 消息队列单个数据出队
+ *
+ * @param[in] q
+ * 队列指针
+ * @param[in] data
+ * 则传入一级指针,如:int data; 传入&data
+ * @return
+ * 成功返回0,失败返回小于0的错误码:
+ * - BBQ_ERR_INPUT_NULL:传入空指针
+ * - BBQ_ERR_EMPTY:队列已空
+ * - BBQ_ERR_BUSY:队列忙碌中
+ * - BBQ_ERR:其它错误
+ */
+extern int bbq_dequeue_elem(struct bbq *q, void *data);
+
+/**
+ * 消息队列批量入队(数据入队),尽可能一次入队n个数据,返回实际成功入队个数
+ *
+ * @param[in] q
+ * 队列指针
+ * @param[in] obj_table
+ * 将数组里的每个数据入队,如:
+ * uint16_t obj_table[1024] = {初始化数据}; 传入obj_table
+ * @param[in] n
+ * 尝试一次入队的个数
+ * @param[out] wait_consumed
+ * 如果为非NULL,返回当前队列中,已入队的个数。。注:该赋值可能会带来些许的性能损耗
+ * @return
+ * 返回实际成功入队的个数。如果始终返回0,可能存在的错误原因:
+ * - 传入空指针
+ * - 队列已满
+ * - 队列忙碌中
+ */
+extern uint32_t bbq_enqueue_burst_elem(struct bbq *q, void const *obj_table, uint32_t n, uint32_t *wait_consumed);
+
+/**
+ * 消息队列批量出队(数据出队),尽可能一次出队n个数据,返回实际成功出队个数
+ *
+ * @param[in] q
+ * 队列指针
+ * @param[out] obj_table
+ * 存储出队的数据,如:
+ * uint16_t obj_table[BUF_CNT] = {0}; 传入(void *)obj_table
+ * @param[in] n
+ * 尝试一次出队的个数
+ * @param[out] wait_consumed
+ * 如果为非NULL,返回当前队列中,已入队的个数。注:该赋值可能会带来些许的性能损耗
+ * @return
+ * 返回实际出队的个数,如果始终返回0,可能存在的原因:
+ * - 传入空指针
+ * - 队列已空
+ * - 队列忙碌中
+ */
+extern uint32_t bbq_dequeue_burst_elem(struct bbq *q, void *obj_table, uint32_t n, uint32_t *wait_consumed);
+
+/**
+ * 用于释放消息队列。
+ *
+ * @param[in] q
+ * 队列指针
+ * @return
+ * - true: 队列为空返回
+ * - false: 队列非空
+ */
+bool bbq_empty(struct bbq *q);
+
+/**
+ * 用于释放消息队列。
+ *
+ * @param[in] q
+ * 队列指针
+ */
+extern void bbq_destory(struct bbq *q);
+
+// 错误码
+#define BBQ_OK 0 // 成功
+#define BBQ_ERR -1 // 通用错误,无法分类时使用
+#define BBQ_ERR_ALLOC -2 // 内存分配失败
+#define BBQ_ERR_INPUT_NULL -3 // 传入空指针
+#define BBQ_ERR_POWER_OF_TWO -4 // 不是2的n次方
+#define BBQ_ERR_OUT_OF_RANGE -5 // 超出范围
+#define BBQ_ERR_STAT_NOT_SUPPORT -6 // 不支持统计
+
+#define BBQ_ERR_FULL -101 // 队列已满(入队失败)
+#define BBQ_ERR_BUSY -102 // 队列忙碌中(入队或出队失败)
+#define BBQ_ERR_EMPTY -103 // 队列已空(出队失败)
+#define BBQ_ERR_NOT_SUPPORT -104 // 不支持的操作 \ No newline at end of file
diff --git a/bbq/src/CMakeLists.txt b/bbq/src/CMakeLists.txt
new file mode 100644
index 0000000..fa91e18
--- /dev/null
+++ b/bbq/src/CMakeLists.txt
@@ -0,0 +1,10 @@
+cmake_minimum_required(VERSION 3.0)
+project(BBQ_LIB)
+
+file(GLOB SRC_LIST ${CMAKE_CURRENT_SOURCE_DIR}/*.c) #搜索当前cmake所在目录下的c文件
+set(LIBRARY_OUTPUT_PATH ${LIB_PATH}) #设置库生成目录
+
+add_library(${BBQ_LIB} STATIC ${SRC_LIST}) #生成静态库
+# add_library(${BBQ_LIB} SHARED ${SRC_LIST}) #生成动态库
+
+target_link_libraries(${BBQ_LIB} numa) # 链接库
diff --git a/bbq/src/bbq.c b/bbq/src/bbq.c
new file mode 100644
index 0000000..0ae6c5d
--- /dev/null
+++ b/bbq/src/bbq.c
@@ -0,0 +1,1077 @@
+/*
+ * @Author: liuyu
+ * @LastEditTime: 2024-07-07 17:44:50
+ * @Email: [email protected]
+ * @Describe: bbq(Block-based Bounded Queue)实现
+ * 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf
+ */
+#include "bbq.h"
+#include <math.h>
+#include <string.h>
+
+// flags第1位控制入队时的数据拷贝策略,默认是"拷贝指针"
+#define BBQ_F_COPY_PTR BBQ_F_DEFAULT /**< 默认为拷贝指针 */
+#define BBQ_F_COPY_VALUE 0x0001 /**< 创建队列时设置为拷贝数值 */
+
+// 判断flags标记位
+#define BBQ_F_CHK_DROP_OLD(flags) (flags & BBQ_F_DROP_OLD)
+#define BBQ_F_CHK_COPY_VALUE(flags) (flags & BBQ_F_COPY_VALUE)
+#define BBQ_F_CHK_SP_ENQ(flags) (flags & BBQ_F_SP_ENQ)
+#define BBQ_F_CHK_SC_DEQ(flags) (flags & BBQ_F_SC_DEQ)
+#define BBQ_F_CHK_STAT_ENABLE(flags) (flags & BBQ_F_ENABLE_STAT)
+
+// 避免无用参数的编译告警
+#define AVOID_WARNING(param) ((void)param)
+
+// 在Debug编译时,BBQ_MEMORY宏定义开关被打开,将在每个块的末尾多分配一个entry,值为BBQ_MEM_MAGIC,方便排查越界问题。
+#ifdef BBQ_MEMORY
+#define BBQ_MEM_MAGIC 0xFE
+#endif
+
+#define bbq_likely(x) __builtin_expect(!!(x), 1)
+#define bbq_unlikely(x) __builtin_expect(!!(x), 0)
+
+struct bbq_status {
+ int32_t status; // 返回状态
+ uint32_t actual_burst; // 实际出/入队个数
+};
+
+enum bbq_queue_state {
+ BBQ_SUCCESS = 0,
+ BBQ_BLOCK_DONE, // 当前块的entry已用完,需要移动到下一个块
+ BBQ_NO_ENTRY, // 队列里没有entry可以使用了
+ BBQ_NOT_AVAILABLE, // 当前块不可以用状态(将返回busy)
+ BBQ_ALLOCATED, // 已分配,返回entry信息
+ BBQ_RESERVED, // 已保留,返回entry信息
+};
+
+struct bbq_entry_desc {
+ uint64_t vsn; // allocated或reserved的版本(vsn)
+ uint64_t off; // entry在当前块的偏移(offset)
+ uint32_t actual_burst; // 实际出/入队个数
+ struct bbq_block *block; // 指向所在的块
+};
+
+struct bbq_queue_state_s {
+ enum bbq_queue_state state; // 队列状态
+ union {
+ uint64_t vsn; // bbq_reserve_entry state==BLOCK_DONE时生效
+ struct bbq_entry_desc e; // state为ALLOCATED、RESERVED生效
+ };
+};
+
+extern inline uint64_t bbq_head_idx(struct bbq *q, uint64_t x) {
+ return x & q->idx_mask;
+}
+
+extern inline uint64_t bbq_head_vsn(struct bbq *q, uint64_t x) {
+ return x >> q->idx_bits;
+}
+
+extern inline uint64_t bbq_cur_off(struct bbq *q, uint64_t x) {
+ return x & q->off_mask;
+}
+
+extern inline uint64_t bbq_cur_vsn(struct bbq *q, uint64_t x) {
+ return x >> q->off_bits;
+}
+
+static inline uint64_t bbq_set_cur_vsn(struct bbq *q, uint64_t ver) {
+ return ver << q->off_bits;
+}
+
+extern inline uint64_t bbq_atomic64_load(union bbq_atomic64 *atomic, bool single) {
+ if (single) {
+ return atomic->s;
+ } else {
+ return atomic_load(&atomic->m);
+ }
+}
+
+extern inline void bbq_atomic64_store(union bbq_atomic64 *atomic, uint64_t value, bool single) {
+ if (single) {
+ atomic->s = value;
+ } else {
+ atomic_store(&atomic->m, value);
+ }
+}
+static inline uint64_t bbq_atomic64_fetch_add(union bbq_atomic64 *atomic, uint64_t value, bool single) {
+ if (single) {
+ uint64_t old = atomic->s;
+ atomic->s += value;
+ return old;
+ } else {
+ return atomic_fetch_add(&atomic->m, value);
+ }
+}
+
+/* 原子的比较两个值大小,并设置较大的值,成功则返回设置前的旧值 */
+uint64_t bbq_fetch_max(union bbq_atomic64 *atomic, uint64_t upd, bool single) {
+ uint64_t old_value = 0;
+
+ if (single) {
+ old_value = atomic->s;
+ atomic->s = upd;
+ } else {
+ do {
+ old_value = atomic_load(&atomic->m);
+ } while (old_value < upd && !atomic_compare_exchange_weak(&atomic->m, &old_value, upd));
+ }
+
+ return old_value;
+}
+
+/* 检查参数是否为2的N次幂 */
+bool bbq_check_power_of_two(uint32_t n) {
+ if (n == 0) {
+ return false;
+ }
+
+ return (n & (n - 1)) == 0;
+}
+
+/* 根据entries大小返回合理的block个数
+ * 计算公式:log2(blocks) = max(1, ⌊log2(count)/4⌋) 注:⌊ ⌋代表向下取整。*/
+static uint32_t bbq_block_number_calc(uint32_t entries) {
+ double log_entries = log2((double)entries);
+ uint32_t over4 = (uint32_t)(log_entries / 4); // 向下取整
+ uint32_t max_value = (over4 > 1) ? over4 : 1;
+ uint32_t n = pow(2, max_value);
+
+ return n;
+}
+
+/* 根据entries大小返回合理的bn、bs*/
+int bbq_bnbs_calc(uint32_t entries, uint32_t *bn, uint32_t *bs) {
+ if (bn == NULL || bs == NULL) {
+ return BBQ_ERR_INPUT_NULL;
+ }
+
+ if (entries <= 1) {
+ return BBQ_ERR_OUT_OF_RANGE;
+ }
+
+ if (bbq_check_power_of_two(entries) == false) {
+ return BBQ_ERR_POWER_OF_TWO;
+ }
+
+ *bn = bbq_block_number_calc(entries);
+ *bs = entries / *bn;
+
+ return BBQ_OK;
+}
+
+void *bbq_malloc_from_pool(struct bbq *q, size_t size) {
+ if (q->memory_pool.size - q->memory_pool.off < size) {
+ return NULL;
+ }
+
+ char *mem = q->memory_pool.ptr + q->memory_pool.off;
+ q->memory_pool.off += size;
+
+ return mem;
+}
+
+/* 块初始化 */
+int block_init(struct bbq *q, struct bbq_block *block, bool cursor_init) {
+ size_t size = 0;
+
+#ifdef BBQ_MEMORY
+ // 末尾多分配一个entry,它永远不应该被修改,以此检查是否存在写越界的问题
+ size = (q->bs + 1) * q->entry_size;
+ block->entries = bbq_malloc_from_pool(q, size);
+ char *last_entry = block->entries + q->entry_size * q->bs;
+ memset(block->entries, 0, size);
+ memset(last_entry, BBQ_MEM_MAGIC, q->entry_size);
+#else
+ size = q->bs * q->entry_size;
+ block->entries = bbq_malloc_from_pool(q, size);
+ memset(block->entries, 0, size);
+#endif
+
+ if (block->entries == NULL) {
+ return BBQ_ERR_ALLOC;
+ }
+
+ if (cursor_init) {
+ // block数组里,除了第一块之外需要设置
+ bbq_atomic64_store(&block->committed, q->bs, q->prod_single);
+ bbq_atomic64_store(&block->allocated, q->bs, q->prod_single);
+ bbq_atomic64_store(&block->reserved, q->bs, q->cons_single);
+ if (!BBQ_F_CHK_DROP_OLD(q->flags)) {
+ bbq_atomic64_store(&block->consumed, q->bs, q->cons_single);
+ }
+ }
+
+ return BBQ_OK;
+}
+
+/* 块清理函数,与block_init成对*/
+void block_destory(struct bbq *q, struct bbq_block *block) {
+ if (block->entries) {
+#ifdef BBQ_MEMORY
+ q->memory_pool.free_f(block->entries, (q->bs + 1) * q->entry_size);
+#else
+ q->memory_pool.free_f(block->entries, q->bs * q->entry_size);
+#endif
+ block->entries = NULL;
+ }
+}
+
+/*
+求x在二进制表示中最高位1所在的位置,x参数不能为0。
+例如:x=1,return 0 (...1); x=3,return 1 (..11); x=9,return 3 (1..1)
+*/
+static unsigned bbq_floor_log2(uint64_t x) {
+ return x == 1 ? 0 : 1 + bbq_floor_log2(x >> 1);
+}
+
+/*
+返回以2为底x的对数,并向上取整值。
+例如:x=1,return 0 (2^0=1); x=99, return 7(2^6=64 2^7=128)
+*/
+static unsigned bbq_ceil_log2(uint64_t x) {
+ return x == 1 ? 0 : bbq_floor_log2(x - 1) + 1;
+}
+
+/* 创建消息队列,bn和bs必须是2的N次幂,socket_id用于多numa分配内存 */
+static struct bbq *__bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs,
+ size_t obj_size, int socket_id, uint32_t flags,
+ bbq_malloc_f malloc_f, bbq_free_f free_f) {
+ int ret = 0;
+ size_t size = 0;
+ if (bbq_check_power_of_two(bn) == false) {
+ return NULL;
+ }
+
+ if (bbq_check_power_of_two(bs) == false) {
+ return NULL;
+ }
+
+ if (name == NULL) {
+ return NULL;
+ }
+
+ if (strlen(name) >= BBQ_SYMBOL_MAX - 1 || obj_size == 0) {
+ return NULL;
+ }
+
+ if (BBQ_F_CHK_DROP_OLD(flags) && BBQ_F_CHK_STAT_ENABLE(flags)) {
+ return NULL;
+ }
+
+ if (malloc_f == NULL || free_f == NULL) {
+ return NULL;
+ }
+
+ uint32_t all_size = 0;
+#ifdef BBQ_MEMORY
+ all_size = sizeof(struct bbq) + bn * sizeof(struct bbq_block) + bn * (bs + 1) * obj_size;
+#else
+ all_size = sizeof(struct bbq) + bn * sizeof(struct bbq_block) + bn * bs * obj_size;
+#endif
+ struct bbq *q = malloc_f(socket_id, all_size);
+ if (q == NULL) {
+ return NULL;
+ }
+
+ memset(q, 0, all_size);
+ q->memory_pool.size = all_size;
+ q->memory_pool.ptr = (char *)q;
+ q->memory_pool.off += sizeof(struct bbq);
+ q->memory_pool.malloc_f = malloc_f;
+ q->memory_pool.free_f = free_f;
+ q->bn = bn;
+ q->bs = bs;
+ q->entry_size = obj_size;
+ q->socket_id = socket_id;
+ strncpy(q->name, name, sizeof(q->name) - 1);
+ q->name[sizeof(q->name) - 1] = '\0';
+
+ if (BBQ_F_CHK_SP_ENQ(flags)) {
+ q->prod_single = true;
+ }
+ if (BBQ_F_CHK_SC_DEQ(flags)) {
+ q->cons_single = true;
+ }
+ q->flags = flags;
+
+ size = bn * sizeof(*q->blocks);
+ q->blocks = bbq_malloc_from_pool(q, size);
+ if (q->blocks == NULL) {
+ goto error;
+ }
+ memset(q->blocks, 0, size);
+
+ for (uint32_t i = 0; i < bn; ++i) {
+ ret = block_init(q, &(q->blocks[i]), (i == 0 ? false : true));
+ if (ret != BBQ_OK) {
+ goto error;
+ }
+ }
+
+ q->idx_bits = bbq_ceil_log2(bn);
+ uint32_t off_bits = bbq_ceil_log2(bs);
+ if (off_bits < 13) {
+ off_bits = 13;
+ }
+ q->off_bits = off_bits; // 多线程同时FAA,可能会超过最大索引,因此多分配一些空间,防止FAA溢出 TODO:多少合适?ver溢出问题?
+
+ q->idx_mask = (1 << q->idx_bits) - 1;
+ q->off_mask = (1 << q->off_bits) - 1;
+
+ return q;
+
+error:
+ bbq_destory(q);
+ return NULL;
+}
+
+/* 使用自定义的bn、bs创建指针入队的bbq,一般用于单元测试 */
+struct bbq *bbq_create_with_bnbs(const char *name, uint32_t bn, uint32_t bs,
+ int socket_id, uint32_t flags,
+ bbq_malloc_f malloc_f, bbq_free_f free_f) {
+ return __bbq_create_bnbs(name, bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR, malloc_f, free_f);
+}
+
+/* 使用自定义的bn、bs创建值入队的bbq,一般用于单元测试 */
+struct bbq *bbq_create_elem_with_bnbs(const char *name, uint32_t bn, uint32_t bs,
+ size_t obj_size, int socket_id, uint32_t flags,
+ bbq_malloc_f malloc_f, bbq_free_f free_f) {
+ return __bbq_create_bnbs(name, bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE, malloc_f, free_f);
+}
+
+/* 创建消息队列,count必须大于1,且是2的N次幂,bn和bs将根据count值自动计算,socket_id用于多numa分配内存,free_func先设置NULL */
+struct bbq *bbq_create_elem(const char *name, uint32_t count, size_t obj_size,
+ int socket_id, uint32_t flags,
+ bbq_malloc_f malloc_f, bbq_free_f free_f) {
+ uint32_t bn = 0;
+ uint32_t bs = 0;
+
+ if (bbq_bnbs_calc(count, &bn, &bs) != BBQ_OK) {
+ return NULL;
+ }
+
+ return __bbq_create_bnbs(name, bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE, malloc_f, free_f);
+}
+
+struct bbq *bbq_create(const char *name, uint32_t count, int socket_id, uint32_t flags,
+ bbq_malloc_f malloc_f, bbq_free_f free_f) {
+ uint32_t bn = 0;
+ uint32_t bs = 0;
+
+ if (bbq_bnbs_calc(count, &bn, &bs) != BBQ_OK) {
+ return NULL;
+ }
+
+ return __bbq_create_bnbs(name, bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR, malloc_f, free_f);
+}
+
+/* 释放消息队列,与bbq_ring_create系列接口成对*/
+void bbq_destory(struct bbq *q) {
+ if (q == NULL) {
+ return;
+ }
+
+ q->memory_pool.free_f(q->memory_pool.ptr, q->memory_pool.size);
+}
+
+#define BBQ_DATA_TYPE_SINGLE 0x0
+#define BBQ_DATA_TYPE_1D_ARRAY 0x1
+#define BBQ_DATA_TYPE_2D_ARRAY 0x2
+void bbq_commit_entry(struct bbq *q, struct bbq_entry_desc *e, void const *data, uint32_t data_type) {
+ size_t idx = e->off * q->entry_size;
+
+ if (BBQ_F_CHK_COPY_VALUE(q->flags)) {
+ // 数据入队列
+ switch (data_type) {
+ case BBQ_DATA_TYPE_1D_ARRAY:
+ case BBQ_DATA_TYPE_SINGLE:
+ memcpy(&(e->block->entries[idx]), data, q->entry_size * e->actual_burst);
+ break;
+ case BBQ_DATA_TYPE_2D_ARRAY: {
+ void **tmp = (void **)data;
+ char *entry = &(e->block->entries[idx]);
+ for (size_t i = 0; i < e->actual_burst; i++) {
+ memcpy(entry, *tmp, q->entry_size);
+ entry += q->entry_size;
+ tmp++;
+ }
+ break;
+ }
+ default:
+ break;
+ }
+ } else {
+ // 指针入队列
+ switch (data_type) {
+ case BBQ_DATA_TYPE_2D_ARRAY:
+ case BBQ_DATA_TYPE_SINGLE:
+ // 二维数组名等于首成员的地址,这里data其实是void **data, &data等同于 &(data[0])
+ memcpy(&(e->block->entries[idx]), data, q->entry_size * e->actual_burst);
+ break;
+ case BBQ_DATA_TYPE_1D_ARRAY:
+ default:
+ break;
+ }
+ }
+ bbq_atomic64_fetch_add(&e->block->committed, e->actual_burst, q->prod_single);
+}
+
+struct bbq_queue_state_s bbq_allocate_entry(struct bbq *q, uint64_t ph, uint32_t n) {
+ struct bbq_queue_state_s state = {0};
+ uint64_t ph_idx = bbq_head_idx(q, ph);
+ bool prod_single = q->prod_single;
+
+ struct bbq_block *block = &(q->blocks[ph_idx]);
+ if (bbq_cur_off(q, bbq_atomic64_load(&block->allocated, prod_single)) >= q->bs) {
+ state.state = BBQ_BLOCK_DONE;
+ return state;
+ }
+
+ uint64_t old = bbq_atomic64_fetch_add(&block->allocated, n, prod_single);
+ uint64_t cur_vsn = bbq_cur_vsn(q, old);
+ uint64_t cur_off = bbq_cur_off(q, old);
+
+ if (cur_off >= q->bs) {
+ state.state = BBQ_BLOCK_DONE;
+ return state;
+ }
+
+ if (cur_off + n <= q->bs) {
+ // 可以全部入队
+ state.e.actual_burst = n;
+ } else {
+ // 部分入队
+ state.e.actual_burst = q->bs - cur_off;
+ }
+ state.e.block = block;
+ state.e.vsn = cur_vsn;
+ state.e.off = cur_off;
+ state.state = BBQ_ALLOCATED;
+
+ return state;
+}
+
+enum bbq_queue_state advance_phead(struct bbq *q, uint64_t ph) {
+ uint64_t cur = 0;
+ // 获取下一个block
+ struct bbq_block *n_blk = &(q->blocks[(bbq_head_idx(q, ph) + 1) & q->idx_mask]);
+ uint64_t ph_vsn = bbq_head_vsn(q, ph);
+ bool prod_single = q->prod_single;
+ bool cons_single = q->cons_single;
+
+ if (BBQ_F_CHK_DROP_OLD(q->flags)) {
+ cur = bbq_atomic64_load(&n_blk->committed, prod_single);
+ // 生产者head避免覆盖上一轮尚未完全提交的区块
+ if (bbq_cur_vsn(q, cur) == ph_vsn && bbq_cur_off(q, cur) != q->bs) {
+ return BBQ_NOT_AVAILABLE;
+ }
+ } else {
+ cur = bbq_atomic64_load(&n_blk->consumed, cons_single);
+ uint64_t reserved;
+ uint64_t consumed_off = bbq_cur_off(q, cur);
+ uint64_t consumed_vsn = bbq_cur_vsn(q, cur);
+
+ if (consumed_vsn < ph_vsn ||
+ (consumed_vsn == ph_vsn && consumed_off != q->bs)) {
+ // 生产者赶上了消费者
+ reserved = bbq_atomic64_load(&n_blk->reserved, cons_single);
+ if (bbq_cur_off(q, reserved) == consumed_off) {
+ return BBQ_NO_ENTRY;
+ } else {
+ return BBQ_NOT_AVAILABLE;
+ }
+ }
+ }
+
+ // 用head的version值初始化下一个块,version在高位,version+1,index或offset也会被清零
+ uint64_t new_vsn = bbq_set_cur_vsn(q, ph_vsn + 1);
+ // 其他线程完成了head更新,当前bbq_fetch_max不会再更新,可能在以下两种情况:
+ // 1)实际ph_vsn与本次要更新的ph_vsn相同。
+ // 2)当前ph_vsn已经落后于实际的ph_vsn(且移动到了下一轮),
+ bbq_fetch_max(&n_blk->committed, new_vsn, prod_single);
+ bbq_fetch_max(&n_blk->allocated, new_vsn, prod_single);
+
+ // ph+1,当超过索引范围,进入下一轮时,version会自动+1
+ bbq_fetch_max(&q->phead.value, ph + 1, prod_single);
+ return BBQ_SUCCESS;
+}
+
+bool bbq_empty(struct bbq *q) {
+ return bbq_atomic64_load(&q->stat.n_enq, q->prod_single) == bbq_atomic64_load(&q->stat.n_deq, q->cons_single);
+}
+
+static uint32_t bbq_wait_consumed_get(struct bbq *q, uint64_t enq_update, uint64_t deq_update) {
+ uint64_t enq_now = 0;
+ uint64_t deq_now = 0;
+
+ if (enq_update == 0) {
+ enq_now = bbq_atomic64_load(&q->stat.n_enq, q->prod_single);
+ } else {
+ enq_now = enq_update;
+ }
+
+ if (deq_update == 0) {
+ deq_now = bbq_atomic64_load(&q->stat.n_deq, q->cons_single);
+ } else {
+ deq_now = deq_update;
+ }
+
+ return enq_now - deq_now;
+}
+
+/* 消息队列入队 */
+static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data,
+ uint32_t n, uint32_t data_type,
+ uint32_t *wait_consumed) {
+ uint64_t enq_update = 0;
+ struct bbq_status ret = {.status = 0, .actual_burst = 0};
+ bool prod_single = q->prod_single;
+
+ if (bbq_unlikely(q == NULL || data == NULL)) {
+ ret.status = BBQ_ERR_INPUT_NULL;
+ return ret;
+ }
+
+ while (true) {
+ uint64_t ph = bbq_atomic64_load(&q->phead.value, prod_single);
+ struct bbq_queue_state_s state = bbq_allocate_entry(q, ph, n);
+
+ switch (state.state) {
+ case BBQ_ALLOCATED:
+ bbq_commit_entry(q, &state.e, data, data_type);
+ ret.actual_burst = state.e.actual_burst;
+ ret.status = BBQ_OK;
+
+ if (BBQ_F_CHK_STAT_ENABLE(q->flags)) {
+ enq_update = bbq_atomic64_fetch_add(&q->stat.n_enq, state.e.actual_burst, prod_single) +
+ state.e.actual_burst;
+ }
+ break;
+ case BBQ_BLOCK_DONE: {
+ enum bbq_queue_state pstate = advance_phead(q, ph);
+ if (pstate == BBQ_SUCCESS) {
+ continue;
+ }
+
+ if (pstate == BBQ_NO_ENTRY) {
+ ret.status = BBQ_ERR_FULL;
+ } else if (pstate == BBQ_NOT_AVAILABLE) {
+ ret.status = BBQ_ERR_BUSY;
+ } else {
+ ret.status = BBQ_ERR;
+ }
+
+ break;
+ }
+ default:
+ ret.status = BBQ_ERR;
+ break;
+ }
+
+ if (BBQ_F_CHK_STAT_ENABLE(q->flags) && wait_consumed != NULL) {
+ *wait_consumed = bbq_wait_consumed_get(q, enq_update, 0);
+ }
+
+ return ret;
+ }
+}
+
+int bbq_enqueue(struct bbq *q, void *const *data) {
+ struct bbq_status ret = __bbq_enqueue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL);
+ return ret.status;
+}
+
+int bbq_enqueue_elem(struct bbq *q, void const *data) {
+ struct bbq_status ret = __bbq_enqueue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL);
+ return ret.status;
+}
+
+/* 更新reserved,成功则返回更新的个数 */
+uint32_t bbq_reserve_update(union bbq_atomic64 *atomic, uint64_t reserved, uint32_t n, bool single) {
+ if (single) {
+ atomic->s += n;
+ return n;
+ } else {
+ // 不能用fetch_max,比如a线程burst2,b线程burst3,a更新成功,b也更新成功。
+ bool ret = atomic_compare_exchange_weak(&atomic->m, &reserved, reserved + n);
+ return ret == true ? n : 0;
+ }
+}
+
+struct bbq_queue_state_s bbq_reserve_entry(struct bbq *q, struct bbq_block *block, uint32_t n) {
+ uint32_t cont = 0;
+ bool prod_single = q->prod_single;
+ bool cons_single = q->cons_single;
+
+ while (true) {
+ struct bbq_queue_state_s state;
+ uint64_t reserved = bbq_atomic64_load(&block->reserved, cons_single);
+ uint64_t reserved_off = bbq_cur_off(q, reserved);
+ uint64_t reserved_svn = bbq_cur_vsn(q, reserved);
+
+ if (reserved_off < q->bs) {
+ uint64_t committed = bbq_atomic64_load(&block->committed, prod_single);
+ uint64_t committed_off = bbq_cur_off(q, committed);
+ if (committed_off == reserved_off) {
+ state.state = BBQ_NO_ENTRY;
+ return state;
+ }
+
+ // 当前块的数据没有被全部commited,需要通过判断allocated和committed来判断是否存在正在入队进行中的数据
+ if (committed_off != q->bs) {
+ uint64_t allocated = bbq_atomic64_load(&block->allocated, prod_single);
+ if (bbq_cur_off(q, allocated) != committed_off) {
+ state.state = BBQ_NOT_AVAILABLE;
+ return state;
+ }
+ }
+
+ uint32_t tmp = committed_off - reserved_off;
+ uint32_t reserved_cnt = bbq_reserve_update(&block->reserved, reserved, tmp < n ? tmp : n, q->cons_single);
+ if (reserved_cnt > 0) {
+ state.state = BBQ_RESERVED;
+ state.e.actual_burst = reserved_cnt;
+ state.e.block = block;
+ state.e.off = reserved_off;
+ state.e.vsn = reserved_svn;
+
+ return state;
+ } else {
+ // 已经被其他线程更新过了,当前数据为旧数据,需要重新获取
+ cont++;
+ continue;
+ }
+ }
+
+ state.state = BBQ_BLOCK_DONE;
+ state.vsn = reserved_svn;
+ return state;
+ }
+}
+
+bool consume_entry(struct bbq *q, struct bbq_entry_desc *e, void *deq_data, uint32_t data_type) {
+ size_t idx = e->off * q->entry_size;
+ bool prod_single = q->prod_single;
+ bool cons_single = q->cons_single;
+
+ if (BBQ_F_CHK_COPY_VALUE(q->flags)) {
+ switch (data_type) {
+ case BBQ_DATA_TYPE_1D_ARRAY:
+ case BBQ_DATA_TYPE_SINGLE:
+ memcpy(deq_data, &(e->block->entries[idx]), q->entry_size * e->actual_burst);
+ break;
+ case BBQ_DATA_TYPE_2D_ARRAY: {
+ void **tmp = (void **)deq_data;
+ char *entry = &(e->block->entries[idx]);
+ for (size_t i = 0; i < e->actual_burst; i++) {
+ memcpy(*tmp, entry, q->entry_size);
+ entry += q->entry_size;
+ tmp++;
+ }
+ break;
+ }
+ default:
+ break;
+ }
+ } else {
+ switch (data_type) {
+ case BBQ_DATA_TYPE_2D_ARRAY:
+ case BBQ_DATA_TYPE_SINGLE:
+ memcpy(deq_data, &(e->block->entries[idx]), q->entry_size * e->actual_burst);
+ break;
+ case BBQ_DATA_TYPE_1D_ARRAY:
+ default:
+ break;
+ }
+ }
+
+ uint64_t allocated;
+ if (BBQ_F_CHK_DROP_OLD(q->flags)) {
+ // TODO:优化,考虑allocated vsn溢出?考虑判断如果生产满了,直接移动head?
+ allocated = bbq_atomic64_load(&e->block->allocated, prod_single);
+ // 预留的entry所在的块,已经被新生产的数据赶上了
+ if (bbq_cur_vsn(q, allocated) != e->vsn) {
+ return false;
+ }
+ } else {
+ bbq_atomic64_fetch_add(&e->block->consumed, e->actual_burst, cons_single);
+ }
+
+ return true;
+}
+
+bool advance_chead(struct bbq *q, uint64_t ch, uint64_t ver) {
+ uint64_t ch_idx = bbq_head_idx(q, ch);
+ struct bbq_block *n_blk = &(q->blocks[(ch_idx + 1) & q->idx_mask]);
+ bool prod_single = q->prod_single;
+ bool cons_single = q->cons_single;
+ uint64_t ch_vsn = bbq_head_vsn(q, ch);
+ uint64_t committed = bbq_atomic64_load(&n_blk->committed, prod_single);
+ uint64_t committed_vsn = bbq_cur_vsn(q, committed);
+
+ if (BBQ_F_CHK_DROP_OLD(q->flags)) {
+ // 通过检查下一个块的版本是否大于或等于当前块来保证 FIFO 顺序.
+ // 第一个块是一个特殊情况,因为与其他块相比,它的版本总是相差一个。因此,如果 ch_idx == 0,我们在比较中加 1
+ if (committed_vsn < ver + (ch_idx == 0)) {
+ return false;
+ }
+
+ bbq_fetch_max(&n_blk->reserved, bbq_set_cur_vsn(q, committed_vsn), cons_single);
+ } else {
+ if (committed_vsn != ch_vsn + 1) {
+ // 消费者追上了生产者,下一块还未开始生产
+ return false;
+ }
+ uint64_t new_vsn = bbq_set_cur_vsn(q, ch_vsn + 1);
+ bbq_fetch_max(&n_blk->consumed, new_vsn, cons_single);
+ bbq_fetch_max(&n_blk->reserved, new_vsn, cons_single);
+ }
+
+ bbq_fetch_max(&q->chead.value, ch + 1, cons_single);
+ return true;
+}
+
+/* 消息队列出队 */
+static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n, uint32_t data_type, uint32_t *wait_consumed) {
+ uint64_t deq_update = 0;
+ bool cons_single = q->cons_single;
+ struct bbq_status ret = {.status = 0, .actual_burst = 0};
+ if (bbq_unlikely(q == NULL || deq_data == NULL)) {
+ ret.status = BBQ_ERR_INPUT_NULL;
+ return ret;
+ }
+
+ while (true) {
+ uint64_t ch = bbq_atomic64_load(&q->chead.value, cons_single);
+ struct bbq_block *blk = &(q->blocks[bbq_head_idx(q, ch)]);
+ struct bbq_queue_state_s state;
+ state = bbq_reserve_entry(q, blk, n);
+
+ switch (state.state) {
+ case BBQ_RESERVED:
+ if (!consume_entry(q, &state.e, deq_data, data_type)) {
+ continue;
+ }
+ ret.status = BBQ_OK;
+ ret.actual_burst = state.e.actual_burst;
+
+ if (BBQ_F_CHK_STAT_ENABLE(q->flags)) {
+ deq_update = bbq_atomic64_fetch_add(&q->stat.n_deq, state.e.actual_burst, cons_single) +
+ state.e.actual_burst;
+ }
+ break;
+ case BBQ_NO_ENTRY:
+ ret.status = BBQ_ERR_EMPTY;
+ break;
+ case BBQ_NOT_AVAILABLE:
+ ret.status = BBQ_ERR_BUSY;
+ break;
+ case BBQ_BLOCK_DONE:
+ if (advance_chead(q, ch, state.vsn)) {
+ continue;
+ }
+ ret.status = BBQ_ERR_EMPTY;
+ break;
+ default:
+ ret.status = BBQ_ERR;
+ break;
+ }
+
+ if (BBQ_F_CHK_STAT_ENABLE(q->flags) && wait_consumed != NULL) {
+ *wait_consumed = bbq_wait_consumed_get(q, 0, deq_update);
+ }
+
+ return ret;
+ }
+}
+
+int bbq_dequeue(struct bbq *q, void **data) {
+ struct bbq_status ret = __bbq_dequeue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL);
+ return ret.status;
+}
+
+int bbq_dequeue_elem(struct bbq *q, void *data) {
+ struct bbq_status ret = __bbq_dequeue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL);
+ return ret.status;
+}
+
+static inline uint32_t bbq_max_burst(struct bbq *q, uint32_t n) {
+ if (bbq_unlikely(n > q->bs)) {
+ return q->bs;
+ }
+
+ return n;
+}
+
+static uint32_t bbq_dequeue_burst_1d_array(struct bbq *q, void *obj_table, uint32_t n, uint32_t *wait_consumed) {
+ if (bbq_unlikely(q == NULL || obj_table == NULL)) {
+ return 0;
+ }
+
+ if (bbq_unlikely(!BBQ_F_CHK_COPY_VALUE(q->flags))) {
+ return 0;
+ }
+
+ uint32_t burst = 0;
+ uint32_t ready = 0;
+ void *obj = obj_table;
+ struct bbq_status ret = {0};
+
+ while (ready < n) {
+ burst = bbq_max_burst(q, n - ready);
+ ret = __bbq_dequeue(q, obj, burst, BBQ_DATA_TYPE_1D_ARRAY, wait_consumed);
+ if (ret.status != BBQ_OK) {
+ break;
+ }
+ obj += q->entry_size * ret.actual_burst;
+ ready += ret.actual_burst;
+ }
+
+ return ready;
+}
+
+static uint32_t bbq_dequeue_burst_2d_array(struct bbq *q, void **obj_table, uint32_t n, uint32_t *wait_consumed) {
+ if (bbq_unlikely(q == NULL || obj_table == NULL)) {
+ return 0;
+ }
+
+ uint32_t burst = 0;
+ uint32_t ready = 0;
+ void **obj_table_tmp = obj_table;
+ struct bbq_status ret = {0};
+
+ while (ready < n) {
+ burst = bbq_max_burst(q, n - ready);
+ ret = __bbq_dequeue(q, obj_table_tmp, burst, BBQ_DATA_TYPE_2D_ARRAY, wait_consumed);
+ if (ret.status != BBQ_OK) {
+ break;
+ }
+ obj_table_tmp += ret.actual_burst;
+ ready += ret.actual_burst;
+ }
+
+ return ready;
+}
+
+/* 尝试一次入队多个数据,直到达到最大数量,或是入队失败 */
+static uint32_t bbq_enqueue_burst_1d_array(struct bbq *q, void const *obj_table, uint32_t n, uint32_t *wait_consumed) {
+ if (bbq_unlikely(q == NULL || obj_table == NULL)) {
+ return 0;
+ }
+
+ if (bbq_unlikely(!BBQ_F_CHK_COPY_VALUE(q->flags))) {
+ return 0;
+ }
+
+ uint32_t burst = 0;
+ uint32_t ready = 0;
+ void const *obj = obj_table;
+ struct bbq_status ret = {0};
+
+ while (ready < n) {
+ burst = bbq_max_burst(q, n - ready);
+ ret = __bbq_enqueue(q, obj, burst, BBQ_DATA_TYPE_1D_ARRAY, wait_consumed);
+ if (ret.status != BBQ_OK) {
+ break;
+ }
+ obj += q->entry_size * ret.actual_burst;
+ ready += ret.actual_burst;
+ }
+
+ return ready;
+}
+
+/* 尝试一次入队多个数据,直到达到最大数量,或是入队失败 */
+static uint32_t bbq_enqueue_burst_2d_array(struct bbq *q, void *const *obj_table, uint32_t n, uint32_t *wait_consumed) {
+ if (bbq_unlikely(q == NULL || obj_table == NULL)) {
+ return 0;
+ }
+
+ uint32_t burst = 0;
+ uint32_t ready = 0;
+ void *const *obj_table_tmp = obj_table;
+ struct bbq_status ret = {0};
+
+ while (ready < n) {
+ burst = bbq_max_burst(q, n - ready);
+ ret = __bbq_enqueue(q, obj_table_tmp, burst, BBQ_DATA_TYPE_2D_ARRAY, wait_consumed);
+ if (ret.status != BBQ_OK) {
+ break;
+ }
+ obj_table_tmp += ret.actual_burst;
+ ready += ret.actual_burst;
+ }
+
+ return ready;
+}
+
+uint32_t bbq_enqueue_burst_elem(struct bbq *q, void const *obj_table, uint32_t n, uint32_t *wait_consumed) {
+ return bbq_enqueue_burst_1d_array(q, obj_table, n, wait_consumed);
+}
+
+uint32_t bbq_enqueue_burst_elem_2d_array(struct bbq *q, void *const *obj_table, uint32_t n, uint32_t *wait_consumed) {
+ return bbq_enqueue_burst_2d_array(q, obj_table, n, wait_consumed);
+}
+
+uint32_t bbq_enqueue_burst(struct bbq *q, void *const *obj_table, uint32_t n, uint32_t *wait_consumed) {
+ return bbq_enqueue_burst_2d_array(q, obj_table, n, wait_consumed);
+}
+
+uint32_t bbq_dequeue_burst(struct bbq *q, void **obj_table, uint32_t n, uint32_t *wait_consumed) {
+ return bbq_dequeue_burst_2d_array(q, obj_table, n, wait_consumed);
+}
+
+uint32_t bbq_dequeue_burst_elem(struct bbq *q, void *obj_table, uint32_t n, uint32_t *wait_consumed) {
+ return bbq_dequeue_burst_1d_array(q, obj_table, n, wait_consumed);
+}
+
+bool bbq_debug_check_array_bounds(struct bbq *q) {
+#ifdef BBQ_MEMORY
+ void *value = malloc(q->entry_size);
+ memset(value, BBQ_MEM_MAGIC, q->entry_size);
+
+ for (size_t i = 0; i < q->bn; i++) {
+ // 针对内存检查版本,申请了bs+1个entry
+ char *last_entry = q->blocks[i].entries + q->bs * q->entry_size;
+ if (memcmp(last_entry, value, q->entry_size) != 0) {
+ return false;
+ }
+ }
+#else
+ AVOID_WARNING(q);
+#endif
+ return true;
+}
+
+#if 0
+#include <stdio.h>
+/* 位置有关代码,需要-fPIC,这里注释掉用于调试 */
+void bbq_debug_block_print(struct bbq *q, struct bbq_block *block) {
+ bool prod_single = q->prod_single;
+ bool cons_single = q->cons_single;
+
+ uint64_t allocated = bbq_atomic64_load(&block->allocated, prod_single);
+ uint64_t committed = bbq_atomic64_load(&block->committed, prod_single);
+ uint64_t reserved = bbq_atomic64_load(&block->reserved, cons_single);
+ uint64_t consumed = bbq_atomic64_load(&block->consumed, cons_single);
+ printf(" allocated:%lu(ver:%lu) committed:%lu(ver:%lu) reserved:%lu(ver:%lu)",
+ bbq_cur_off(q, allocated), bbq_cur_vsn(q, allocated),
+ bbq_cur_off(q, committed), bbq_cur_vsn(q, committed),
+ bbq_cur_off(q, reserved), bbq_cur_vsn(q, reserved));
+ if (BBQ_F_CHK_DROP_OLD(q->flags)) {
+ printf("\n");
+ } else {
+ printf(" consumed:%lu(ver:%lu)\n", bbq_cur_off(q, consumed), bbq_cur_vsn(q, consumed));
+ }
+}
+
+void bbq_debug_struct_print(struct bbq *q) {
+ printf("-----bbq:%s-----\n", BBQ_F_CHK_DROP_OLD(q->flags) ? "drop old" : "retry new");
+ uint64_t phead = bbq_atomic64_load(&q->phead.value, q->prod_single);
+ uint64_t chead = bbq_atomic64_load(&q->chead.value, q->cons_single);
+
+ printf("block number:%u block size:%u total entries:%u\n", q->bn, q->bs, q->bn * q->bs);
+ printf("producer header idx:%lu vsn:%lu\n", bbq_head_idx(q, phead), bbq_head_vsn(q, phead));
+
+ uint64_t ph_idx = bbq_head_idx(q, phead);
+ uint64_t ch_idx = bbq_head_idx(q, chead);
+ if (ph_idx != ch_idx) {
+ printf("block[%lu]\n", ph_idx);
+ bbq_debug_block_print(q, &(q->blocks[ph_idx]));
+ }
+
+ printf("consumer header idx:%lu vsn:%lu\n", bbq_head_idx(q, chead), bbq_head_vsn(q, chead));
+ printf("block[%lu]\n", ch_idx);
+ bbq_debug_block_print(q, &(q->blocks[ch_idx]));
+}
+#endif
+
+#if 0
+/* 根据实际head以及块上的游标推算出待消费的个数,该函数很影响性能 */
+static uint32_t bbq_wait_consumed_get_from_head(struct bbq *q,
+ uint64_t *ch_ptr,
+ uint64_t *ph_ptr,
+ struct bbq_block *blk_ph) {
+ uint64_t ch = 0;
+ uint64_t ph = 0;
+ if (ch_ptr != NULL) {
+ ch = *ch_ptr;
+ } else {
+ ch = bbq_atomic64_load(&q->chead.value);
+ }
+
+ if (ph_ptr != NULL) {
+ ph = *ph_ptr;
+ } else {
+ ph = bbq_atomic64_load(&q->phead.value);
+ }
+
+ uint64_t ph_idx = bbq_head_idx(q, ph);
+ uint64_t ch_idx = bbq_head_idx(q, ch);
+ uint64_t committed_off = bbq_cur_off(q, bbq_atomic64_load(&blk_ph->committed));
+
+ struct bbq_block *blk_ch = &(q->blocks[bbq_head_idx(q, ch)]);
+ uint64_t reserved_off = bbq_cur_off(q, bbq_atomic64_load(&blk_ch->reserved));
+
+ // "生产者"超过"消费者"块的个数
+ uint64_t idx_diff = ph_idx >= ch_idx ? ph_idx - ch_idx : q->bn - ch_idx + ph_idx;
+ if (!BBQ_F_CHK_DROP_OLD(q->flags)) {
+ // 这里idx_diff-1=-1也是正确。
+ return (idx_diff - 1) * q->bs + (q->bs - reserved_off + committed_off);
+ }
+
+ uint64_t ch_vsn = bbq_head_vsn(q, ch);
+ uint64_t ph_vsn = bbq_head_vsn(q, ph);
+
+ if (ph_vsn == ch_vsn || (ph_vsn == (ch_vsn + 1) && (ph_idx < ch_idx))) {
+ // drop old模式,未发生覆盖
+ return (idx_diff - 1) * q->bs + (q->bs - reserved_off + committed_off);
+ }
+
+ if (ph_idx == ch_idx) {
+ // drop old模式,发生了覆盖,当前块以及之前已生产的都作废
+ return 0;
+ }
+
+ return (idx_diff - 1) * q->bs + committed_off;
+}
+
+/* 根据head偏移判断是否为空,耗性能函数 */
+bool bbq_empty_from_head(struct bbq *q) {
+ uint64_t phead = bbq_atomic64_load(&q->phead.value);
+ uint64_t chead = bbq_atomic64_load(&q->chead.value);
+
+ uint64_t ph_vsn = bbq_head_vsn(q, phead);
+ uint64_t ch_vsn = bbq_head_vsn(q, chead);
+ uint64_t ph_idx = bbq_head_idx(q, phead);
+ uint64_t ch_idx = bbq_head_idx(q, chead);
+
+ struct bbq_block *block;
+
+ if (ph_idx != ch_idx) {
+ return false;
+ }
+
+ block = &q->blocks[ph_idx];
+ if (ph_vsn == ch_vsn) {
+ if (bbq_cur_off(q, bbq_atomic64_load(&block->reserved)) == bbq_cur_off(q, bbq_atomic64_load(&block->committed))) {
+ return true;
+ }
+ }
+
+ uint64_t reserved = bbq_atomic64_load(&block->reserved);
+ uint64_t reserved_off = bbq_cur_off(q, reserved);
+
+ if (BBQ_F_CHK_DROP_OLD(q->flags) &&
+ ph_vsn > ch_vsn &&
+ reserved_off != q->bs) {
+ // 生产者追上了消费者,当前块以及未消费的全部
+ // 如果reserved指向当前块的最后一个entry,可以移动head消费下一块,否则返回空
+ return true;
+ }
+
+ return false;
+}
+#endif \ No newline at end of file
diff --git a/bbq/unittest/CMakeLists.txt b/bbq/unittest/CMakeLists.txt
new file mode 100644
index 0000000..fedd026
--- /dev/null
+++ b/bbq/unittest/CMakeLists.txt
@@ -0,0 +1,25 @@
+cmake_minimum_required(VERSION 3.0)
+project(BBQ_TESTS)
+
+
+# 指定库路径
+link_directories(${LIB_PATH})
+# 指定头文件
+include_directories(
+ ${CMAKE_CURRENT_SOURCE_DIR}/common/
+)
+
+# 链接静态库
+link_libraries(bbq)
+# 指定可执行文件输出路径
+set(EXECUTABLE_OUTPUT_PATH ${EXEC_PATH})
+
+# 搜索当前cmake文件所在目录下的c文件
+file(GLOB SRC_C_LIST "${CMAKE_CURRENT_SOURCE_DIR}/*.c")
+file(GLOB SRC_LIST "${CMAKE_CURRENT_SOURCE_DIR}/*.cc")
+list(APPEND SRC_LIST ${SRC_C_LIST})
+
+add_executable(bbq_unittest ${SRC_LIST}) # 添加可执行程序
+target_link_libraries(bbq_unittest gtest gtest_main pthread) # 链接gtest库
+
+add_test(bbq_unittest ${EXEC_PATH}/bbq_unittest) # 添加测试,保证make test可以执行该测试用例
diff --git a/bbq/unittest/ut_bbq.cc b/bbq/unittest/ut_bbq.cc
new file mode 100644
index 0000000..d244894
--- /dev/null
+++ b/bbq/unittest/ut_bbq.cc
@@ -0,0 +1,1189 @@
+/*
+ * @Author: liuyu
+ * @LastEditTime: 2024-07-07 21:59:54
+ * @Email: [email protected]
+ * @Describe: 简单的测试用例,测试基本功能
+ */
+
+#include "gtest/gtest.h"
+extern "C" {
+#include "ut_bbq_func.h"
+#include <math.h>
+extern bool bbq_debug_check_array_bounds(struct bbq *q);
+extern void bbq_struct_print(struct bbq *q);
+extern uint32_t bbq_enqueue_burst_elem_2d_array(struct bbq *q, void *const *obj_table, uint32_t n, uint32_t *wait_consumed);
+extern bool bbq_check_power_of_two(int n);
+extern unsigned bbq_ceil_log2(uint64_t x);
+extern uint64_t bbq_fetch_max(union bbq_atomic64 *atomic, uint64_t upd, bool single);
+extern bool ut_malloc_free_equal();
+extern int bbq_bnbs_calc(uint32_t entries, uint32_t *bn, uint32_t *bs);
+extern void bbq_atomic64_store(union bbq_atomic64 *atomic, uint64_t value, bool single);
+extern uint64_t bbq_atomic64_load(union bbq_atomic64 *atomic, bool single);
+extern bool bbq_debug_check_array_bounds(struct bbq *q);
+extern struct bbq *bbq_create_elem_with_bnbs(const char *name, uint32_t bn, uint32_t bs,
+ size_t obj_size, int socket_id, uint32_t flags,
+ bbq_malloc_f malloc_f, bbq_free_f free_f);
+extern uint64_t bbq_atomic64_load(union bbq_atomic64 *atomic, bool single);
+}
+
+#define BUF_CNT 4096
+
+class ut_bbq : public testing::Test {
+ protected:
+ virtual void SetUp() override {
+ // 1.清空内存malloc/free统计
+ ut_memory_counter_clear();
+
+ // 2.入队空间初始化
+ UT_DOUBLE_PTR_DATA_INIT(enq_table1, uint16_t, BUF_CNT);
+ UT_PTR_ARRAY_DATA_INIT(enq_table2, uint16_t, BUF_CNT);
+ UT_ARRAY_DATA_INIT(enq_table3, BUF_CNT);
+ }
+
+ virtual void TearDown() override {
+ // 1.释放测试数据
+ UT_DOUBLE_PTR_DATA_DESTORY(enq_table1, BUF_CNT);
+ UT_PTR_ARRAY_DATA_DESTORY(enq_table2, BUF_CNT);
+
+ // 2.内存泄漏检测
+ EXPECT_TRUE(ut_malloc_free_equal());
+ }
+
+ // 入队数据
+ uint16_t **enq_table1;
+ uint16_t *enq_table2[BUF_CNT];
+ uint16_t enq_table3[BUF_CNT];
+};
+
+TEST_F(ut_bbq, single_retry_new_cp_ptr) {
+ int ret = 0;
+ uint64_t cnt = 0;
+ uint16_t *deq_data = NULL;
+
+ // 创建队列
+ struct bbq *q = bbq_create("ut_bbq", BUF_CNT, BBQ_SOCKET_ID_ANY,
+ BBQ_F_RETRY_NEW, ut_malloc_def_callback, ut_free_def_callback);
+ ASSERT_NE(q, nullptr);
+
+ // 空队出队失败
+ EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_ERR_EMPTY);
+
+ // 全部入队成功
+ for (uint32_t i = 0; i < 4000; i++) {
+ if (bbq_enqueue(q, (void **)&enq_table1[i]) == 0) {
+ cnt++;
+ }
+ }
+
+ // 部分入队成功
+ for (uint32_t i = 0; i < 4000; i++) {
+ if (bbq_enqueue(q, (void **)&enq_table2[i]) == 0) {
+ cnt++;
+ }
+ }
+
+ // 入队成功个数等于队列总数
+ EXPECT_EQ(cnt, BUF_CNT);
+
+ cnt = 0;
+ for (uint32_t i = 0; i < BUF_CNT; i++) {
+ ret = bbq_dequeue(q, (void **)&deq_data);
+ if (ret == 0) {
+ EXPECT_EQ(*deq_data, UT_DATA_MAGIC);
+ cnt++;
+ }
+ }
+ // 全部出队成功
+ EXPECT_EQ(cnt, BUF_CNT);
+
+ // 空队出队失败
+ EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_ERR_EMPTY);
+ bbq_destory(q);
+}
+
+TEST_F(ut_bbq, single_retry_new_cp_value) {
+ int ret = 0;
+ uint64_t cnt = 0;
+ uint16_t deq_data;
+
+ // 创建队列
+ struct bbq *q = bbq_create_elem("ut_bbq", BUF_CNT, sizeof(uint16_t),
+ BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW,
+ ut_malloc_def_callback, ut_free_def_callback);
+ ASSERT_NE(q, nullptr);
+
+ // 空队出队失败
+ EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_ERR_EMPTY);
+
+ // 全部入队成功
+ for (uint32_t i = 0; i < 4000; i++) {
+ if (bbq_enqueue(q, (void **)enq_table1[i]) == 0) {
+ cnt++;
+ }
+ }
+
+ // 部分入队成功
+ for (uint32_t i = 0; i < 4000; i++) {
+ if (bbq_enqueue_elem(q, enq_table2[i]) == 0) {
+ cnt++;
+ }
+ }
+
+ // 入队成功个数等于队列总数
+ EXPECT_EQ(cnt, BUF_CNT);
+
+ cnt = 0;
+ for (uint32_t i = 0; i < BUF_CNT; i++) {
+ ret = bbq_dequeue_elem(q, &deq_data);
+ if (ret == 0) {
+ EXPECT_EQ(deq_data, UT_DATA_MAGIC);
+ cnt++;
+ }
+ }
+ // 全部出队成功
+ EXPECT_EQ(cnt, BUF_CNT);
+
+ // 空队出队失败
+ EXPECT_EQ(bbq_dequeue_elem(q, &deq_data), BBQ_ERR_EMPTY);
+ bbq_destory(q);
+}
+
+TEST_F(ut_bbq, single_drop_old_cp_pointer) {
+ int ret = 0;
+ uint64_t cnt = 0;
+ uint16_t *deq_data = NULL;
+ uint64_t first_cnt = BUF_CNT;
+ uint64_t second_cnt = 1000;
+
+ // 创建队列
+ struct bbq *q = bbq_create("ut_bbq", BUF_CNT, BBQ_SOCKET_ID_ANY,
+ BBQ_F_DROP_OLD, ut_malloc_def_callback,
+ ut_free_def_callback);
+ ASSERT_NE(q, nullptr);
+ EXPECT_LT(second_cnt, q->bs * q->bn);
+
+ // 空队出队失败
+ EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_ERR_EMPTY);
+
+ // 全部入队成功,入队个数是BUF_CNT的整数倍,因此到了一个边界,刚好与消费者位置一致(套了loop圈)
+ uint32_t loop = 3;
+ for (uint32_t n = 0; n < loop; n++) {
+ for (uint32_t i = 0; i < first_cnt; i++) {
+ ret = bbq_enqueue(q, (void **)&enq_table1[i]);
+ if (ret == 0) {
+ cnt++;
+ }
+ }
+ }
+ EXPECT_EQ(cnt, first_cnt * loop);
+
+ // 全部入队成功
+ cnt = 0;
+ for (uint32_t i = 0; i < second_cnt; i++) {
+ if (bbq_enqueue(q, (void **)&enq_table2[i]) == 0) {
+ cnt++;
+ }
+ }
+ EXPECT_EQ(cnt, second_cnt);
+
+ cnt = 0;
+ for (uint32_t i = 0; i < BUF_CNT; i++) {
+ ret = bbq_dequeue(q, (void **)&deq_data);
+ if (ret == 0) {
+ EXPECT_EQ(*deq_data, UT_DATA_MAGIC);
+ cnt++;
+ }
+ }
+
+ // 一旦生产者追上了消费者,之前未消费的,以及当前块的数据全都作废了。
+ EXPECT_EQ(cnt, second_cnt - q->bs);
+
+ // 空队出队失败
+ EXPECT_EQ(bbq_dequeue(q, (void **)&deq_data), BBQ_ERR_EMPTY);
+
+ bbq_destory(q);
+}
+
+TEST_F(ut_bbq, single_drop_old_cp_value) {
+ int ret = 0;
+ uint64_t cnt = 0;
+ uint16_t deq_data;
+ uint64_t first_cnt = BUF_CNT;
+ uint64_t second_cnt = 1000;
+
+ // 创建队列
+ struct bbq *q = bbq_create_elem("ut_bbq", BUF_CNT, sizeof(uint16_t),
+ BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD,
+ ut_malloc_def_callback, ut_free_def_callback);
+ ASSERT_NE(q, nullptr);
+ EXPECT_LT(second_cnt, q->bs * q->bn);
+
+ // 空队出队失败
+ EXPECT_EQ(bbq_dequeue_elem(q, &deq_data), BBQ_ERR_EMPTY);
+
+ // 全部入队成功,入队个数是BUF_CNT的整数倍,因此到了一个边界,刚好与消费者位置一致(套了loop圈)
+ uint32_t loop = 3;
+ for (uint32_t n = 0; n < loop; n++) {
+ for (uint32_t i = 0; i < first_cnt; i++) {
+ ret = bbq_enqueue_elem(q, enq_table1[i]);
+ if (ret == 0) {
+ cnt++;
+ }
+ }
+ }
+ EXPECT_EQ(cnt, first_cnt * loop);
+
+ // 全部入队成功
+ cnt = 0;
+ for (uint32_t i = 0; i < second_cnt; i++) {
+ if (bbq_enqueue_elem(q, enq_table2[i]) == 0) {
+ cnt++;
+ }
+ }
+ EXPECT_EQ(cnt, second_cnt);
+
+ cnt = 0;
+ for (uint32_t i = 0; i < BUF_CNT; i++) {
+ ret = bbq_dequeue_elem(q, &deq_data);
+ if (ret == 0) {
+ EXPECT_EQ(deq_data, UT_DATA_MAGIC);
+ cnt++;
+ }
+ }
+
+ // 一旦生产者追上了消费者,之前未消费的,以及当前块的数据全都作废了。
+ EXPECT_EQ(cnt, second_cnt - q->bs);
+ // 空队出队失败
+ EXPECT_EQ(bbq_dequeue_elem(q, &deq_data), BBQ_ERR_EMPTY);
+
+ bbq_destory(q);
+}
+
+TEST_F(ut_bbq, burst_retry_new_cp_value) {
+ struct bbq *q;
+ uint32_t ret1 = 0;
+ uint32_t ret2 = 0;
+ uint64_t first_cnt = 4000;
+ uint64_t second_cnt = 1000;
+ uint16_t deq_table1[BUF_CNT] = {0};
+ uint16_t *deq_table2 = (uint16_t *)ut_malloc(UT_MODULE_DATA, sizeof(uint16_t) * BUF_CNT);
+ uint32_t wait_consumed = 0;
+
+ // 创建队列
+ q = bbq_create_elem("ut_bbq", BUF_CNT, sizeof(uint16_t),
+ BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW | BBQ_F_ENABLE_STAT,
+ ut_malloc_def_callback, ut_free_def_callback);
+ ASSERT_NE(q, nullptr);
+ EXPECT_LT(first_cnt, q->bn * q->bs);
+
+ // 批量入队(全部成功)
+ ret1 = bbq_enqueue_burst_elem(q, (void const *)enq_table3, first_cnt, &wait_consumed);
+ EXPECT_EQ(ret1, first_cnt);
+ EXPECT_EQ(wait_consumed, ret1);
+
+ // 批量入队(部分成功)
+ // 由于需要将最终的值入队列,二维数组里的值不连续,需要循环赋值。不推荐这个函数,但可用于特殊场景。
+ ret2 = bbq_enqueue_burst_elem_2d_array(q, (void *const *)enq_table2, second_cnt, &wait_consumed);
+ EXPECT_EQ(ret2, BUF_CNT - ret1);
+ EXPECT_EQ(wait_consumed, ret1 + ret2);
+
+ // 出队列(全部成功)
+ ret1 = bbq_dequeue_burst_elem(q, (void *)deq_table1, first_cnt, &wait_consumed);
+ EXPECT_EQ(ret1, first_cnt);
+ EXPECT_EQ(wait_consumed, ret2);
+
+ // 出队列(部分成功)
+ ret2 = bbq_dequeue_burst_elem(q, (void *)deq_table2, second_cnt, &wait_consumed);
+ EXPECT_EQ(ret2, BUF_CNT - ret1);
+ EXPECT_EQ(wait_consumed, 0);
+
+ // 验证数据
+ for (uint32_t i = 0; i < ret1; i++) {
+ EXPECT_EQ(deq_table1[i], UT_DATA_MAGIC) << "i :" << i;
+ }
+
+ for (uint32_t i = 0; i < ret2; i++) {
+ EXPECT_EQ(deq_table2[i], UT_DATA_MAGIC) << "i :" << i;
+ }
+
+ EXPECT_TRUE(bbq_debug_check_array_bounds(q));
+ ut_free(UT_MODULE_DATA, deq_table2);
+ bbq_destory(q);
+}
+
+TEST_F(ut_bbq, burst_retry_new_cp_pointer) {
+ struct bbq *q;
+ uint32_t ret1 = 0;
+ uint32_t ret2 = 0;
+ uint64_t first_cnt = 4000;
+ uint64_t second_cnt = 1000;
+ uint32_t wait_consumed = 0;
+ uint16_t *deq_table1[BUF_CNT] = {0};
+ uint16_t **deq_table2 = (uint16_t **)ut_malloc(UT_MODULE_DATA, sizeof(uint16_t *) * BUF_CNT);
+
+ // 创建队列
+ q = bbq_create("ut_bbq", BUF_CNT, BBQ_SOCKET_ID_ANY,
+ BBQ_F_RETRY_NEW | BBQ_F_ENABLE_STAT,
+ ut_malloc_def_callback, ut_free_def_callback);
+ ASSERT_NE(q, nullptr);
+ EXPECT_LT(first_cnt, q->bn * q->bs);
+
+ // 批量入队(全部成功)
+ ret1 = bbq_enqueue_burst(q, (void *const *)enq_table1, first_cnt, &wait_consumed);
+ EXPECT_EQ(ret1, first_cnt);
+ EXPECT_EQ(wait_consumed, ret1);
+
+ // 批量入队(部分成功)
+ ret2 = bbq_enqueue_burst(q, (void *const *)enq_table2, second_cnt, &wait_consumed);
+ EXPECT_EQ(ret2, BUF_CNT - ret1);
+ EXPECT_EQ(wait_consumed, ret1 + ret2);
+
+ // 出队列(全部成功)
+ ret1 = bbq_dequeue_burst(q, (void **)deq_table1, first_cnt, &wait_consumed);
+ EXPECT_EQ(ret1, first_cnt);
+ EXPECT_EQ(wait_consumed, ret2);
+
+ // 出队列(部分成功)
+ ret2 = bbq_dequeue_burst(q, (void **)deq_table2, second_cnt, &wait_consumed);
+ EXPECT_EQ(ret2, BUF_CNT - ret1);
+ EXPECT_EQ(wait_consumed, 0);
+
+ // 验证数据
+ for (uint32_t i = 0; i < ret1; i++) {
+ EXPECT_EQ(*deq_table1[i], UT_DATA_MAGIC) << "i :" << i;
+ }
+
+ for (uint32_t i = 0; i < ret2; i++) {
+ EXPECT_EQ(*deq_table2[i], UT_DATA_MAGIC) << "i :" << i;
+ }
+
+ EXPECT_TRUE(bbq_debug_check_array_bounds(q));
+ ut_free(UT_MODULE_DATA, deq_table2);
+ bbq_destory(q);
+}
+
+TEST_F(ut_bbq, burst_drop_old_cp_pointer) {
+ struct bbq *q;
+ uint32_t ret1 = 0;
+ uint32_t ret2 = 0;
+ uint64_t first_cnt = BUF_CNT;
+ uint64_t second_cnt = 1000;
+ uint32_t wait_consumed = 0;
+ uint16_t *deq_table1[BUF_CNT] = {0};
+ uint16_t **deq_table2 = (uint16_t **)ut_malloc(UT_MODULE_DATA, sizeof(uint16_t *) * BUF_CNT);
+
+ // 创建队列
+ q = bbq_create("ut_bbq", BUF_CNT, BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD,
+ ut_malloc_def_callback, ut_free_def_callback);
+ ASSERT_NE(q, nullptr);
+ EXPECT_GT(second_cnt, q->bs);
+ EXPECT_LT(second_cnt, q->bs * q->bn);
+
+ // 批量入队(全部成功,入队个数等于队列总容量,未发生覆盖)
+ ret1 = bbq_enqueue_burst(q, (void *const *)enq_table1, first_cnt, &wait_consumed);
+ EXPECT_EQ(ret1, first_cnt);
+ // EXPECT_EQ(wait_consumed, ret1);
+
+ // 批量入队(全部成功),覆盖了旧数据
+ ret2 = bbq_enqueue_burst(q, (void *const *)enq_table2, second_cnt, &wait_consumed);
+ EXPECT_EQ(ret2, second_cnt);
+ // EXPECT_EQ(wait_consumed, second_cnt - q->bs);
+
+ // 出队列(部分成功)
+ // 一旦生产者追上了消费者,之前未消费的,以及当前块的数据全都作废了。本例中第一个完整块作废。
+ ret1 = bbq_dequeue_burst(q, (void **)deq_table1, BUF_CNT, &wait_consumed);
+ EXPECT_EQ(ret1, second_cnt - q->bs);
+ // EXPECT_EQ(wait_consumed, 0);
+
+ // 验证数据
+ for (uint32_t i = 0; i < ret1; i++) {
+ EXPECT_EQ(*deq_table1[i], UT_DATA_MAGIC) << "i :" << i;
+ }
+
+ // 此时生产者和消费者在同一块上,入队个数为队列容量的N倍,由于发生了覆盖,且依旧在同一块上,数据全作废
+ for (uint32_t loop = 0; loop < 3; loop++) {
+ ret1 = bbq_enqueue_burst(q, (void *const *)enq_table1, BUF_CNT, &wait_consumed);
+ EXPECT_EQ(ret1, BUF_CNT);
+ EXPECT_TRUE(bbq_empty(q));
+ EXPECT_EQ(wait_consumed, 0);
+ }
+
+ EXPECT_TRUE(bbq_debug_check_array_bounds(q));
+ ut_free(UT_MODULE_DATA, deq_table2);
+ bbq_destory(q);
+}
+
+TEST_F(ut_bbq, burst_drop_old_cp_value) {
+ struct bbq *q;
+ uint32_t ret1 = 0;
+ uint32_t ret2 = 0;
+ uint64_t first_cnt = BUF_CNT;
+ uint64_t second_cnt = 1000;
+ uint32_t wait_consumed = 0;
+ uint16_t deq_table1[BUF_CNT] = {0};
+
+ // 创建队列
+ q = bbq_create_elem("ut_bbq", BUF_CNT, sizeof(uint16_t),
+ BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD,
+ ut_malloc_def_callback, ut_free_def_callback);
+ ASSERT_NE(q, nullptr);
+ EXPECT_GT(second_cnt, q->bs);
+ EXPECT_LT(second_cnt, q->bs * q->bn);
+
+ // 批量入队(全部成功)
+ ret1 = bbq_enqueue_burst_elem(q, (void const *)enq_table3, first_cnt, &wait_consumed);
+ EXPECT_EQ(ret1, first_cnt);
+ // EXPECT_EQ(wait_consumed, ret1);
+
+ // 批量入队(全部成功),覆盖了旧数据
+ // 由于需要将最终的值入队列,二维数组里的值不连续,需要循环赋值。不推荐这个函数,但可用于特殊场景。
+ ret2 = bbq_enqueue_burst_elem_2d_array(q, (void *const *)enq_table1, second_cnt, &wait_consumed);
+ EXPECT_EQ(ret2, second_cnt);
+ // EXPECT_EQ(wait_consumed, second_cnt - q->bs);
+
+ // 出队列(部分成功)
+ // 一旦生产者追上了消费者,之前未消费的,以及当前块的数据全都作废了。
+ ret1 = bbq_dequeue_burst_elem(q, (void *)deq_table1, BUF_CNT, &wait_consumed);
+ EXPECT_EQ(ret1, second_cnt - q->bs);
+ EXPECT_EQ(wait_consumed, 0);
+
+ // 验证数据
+ for (uint32_t i = 0; i < ret1; i++) {
+ EXPECT_EQ(deq_table1[i], UT_DATA_MAGIC) << "i :" << i;
+ }
+
+ EXPECT_TRUE(bbq_debug_check_array_bounds(q));
+ bbq_destory(q);
+}
+
+typedef struct {
+ uint64_t thread_cnt;
+ bbq_atomic64 data;
+ aotmic_uint64 ready_thread_cnt;
+} ut_fetch_arg;
+
+void *fetch_max_thread_func(void *arg) {
+ ut_fetch_arg *fetch_arg = (ut_fetch_arg *)arg;
+ fetch_arg->ready_thread_cnt.fetch_add(1);
+
+ while (fetch_arg->ready_thread_cnt.load() != fetch_arg->thread_cnt) {
+ }
+
+ uint64_t *ret = (uint64_t *)ut_malloc(UT_MODULE_UTEST, sizeof(*ret));
+ // 不同线程写入不同的>3的数
+ *ret = bbq_fetch_max(&fetch_arg->data, pthread_self() + 3, false);
+ pthread_exit(ret);
+}
+
+TEST_F(ut_bbq, mpmc) {
+ struct ut_info_s info {
+ .cfg = {
+ .base = {
+ .name = {},
+ .introduce = {},
+ .core_begin = 0,
+ .core_end = 3,
+ },
+ .ring = {
+ .ring_type = UT_RING_TYPE_BBQ,
+ .producer_cnt = 4,
+ .consumer_cnt = 4,
+ .workload = UT_WORKLOAD_SIMPLE,
+ .entries_cnt = 4096,
+ .block_count = 0,
+ .burst_cnt = 4,
+ },
+ .run = {
+ .run_ok_times = 9000000,
+ .run_time = 0,
+ },
+ },
+ .ctl = {
+ .running = true,
+ .all_threads_start = {},
+ .producer_exit = {},
+ },
+ };
+
+ // 队列初始化
+ int ret = -1;
+ struct ut_queue q;
+ ret = ut_queue_init_bbq(&info.cfg, &q);
+ ASSERT_TRUE(ret == 0);
+
+ // 创建线程
+ pthread_t *threads = ut_threads_create(&info, &q);
+ ASSERT_TRUE(threads);
+
+ // 等待所有线程完成,回收数据
+ uint32_t thread_cnt = info.cfg.ring.producer_cnt + info.cfg.ring.consumer_cnt;
+ struct ut_exit_data **exit_data = (struct ut_exit_data **)ut_malloc(UT_MODULE_UTEST, sizeof(struct ut_exit_data **) * (thread_cnt));
+ ut_wait_all_threads_exit(&info, thread_cnt, threads, exit_data);
+
+ // 比较数据
+ struct ut_merge_s merge;
+ memset(&merge, 0, sizeof(merge));
+ ut_merge_all_data(exit_data, thread_cnt, &merge);
+ EXPECT_EQ(merge.consumer.data_error_cnt, 0);
+ EXPECT_EQ(merge.consumer.ok_cnt, merge.producer.ok_cnt);
+
+ // 释放数据
+ for (uint32_t i = 0; i < thread_cnt; i++) {
+ ut_exit_data_destory(exit_data[i]);
+ }
+ ut_free(UT_MODULE_UTEST, exit_data);
+ ut_threads_destory(&info, threads);
+ EXPECT_TRUE(bbq_debug_check_array_bounds((struct bbq *)q.ring));
+ ut_queue_destory(&q);
+}
+
+TEST_F(ut_bbq, bbq_fetch_max) {
+ uint64_t ret = 0;
+ ut_fetch_arg arg;
+
+ bool single = false;
+ arg.data.m.store(0);
+ arg.thread_cnt = 0;
+ arg.ready_thread_cnt.store(0);
+
+ bbq_atomic64_store(&arg.data, 1, single); // 初始化1
+ arg.thread_cnt = 50;
+
+ ret = bbq_fetch_max(&arg.data, 2, single); // max比较后设置为2
+ EXPECT_EQ(bbq_atomic64_load(&arg.data, single), 2);
+ EXPECT_EQ(ret, 1);
+
+ pthread_t *threads = (pthread_t *)ut_malloc(UT_MODULE_UTEST, sizeof(*threads) * arg.thread_cnt);
+ for (uint64_t i = 0; i < arg.thread_cnt; i++) {
+ // 多个线程同时fetch_max,输入 > 3的数据
+ pthread_create(&threads[i], NULL, fetch_max_thread_func, (void *)&arg);
+ }
+
+ int eq_cnt = 0;
+ for (uint64_t i = 0; i < arg.thread_cnt; i++) {
+ uint64_t *tret;
+ pthread_join(threads[i], (void **)&tret); // 等待每个线程结束
+ if (*tret == 2) {
+ eq_cnt++; // 统计返回2的个数
+ }
+ ut_free(UT_MODULE_UTEST, tret);
+ }
+
+ // EXPECT_EQ(eq_cnt, 1);
+ ut_free(UT_MODULE_UTEST, threads);
+}
+
+TEST_F(ut_bbq, power_of_two) {
+ uint32_t tmp = 0;
+ uint32_t max = pow(2, 32) - 1;
+
+ EXPECT_FALSE(bbq_check_power_of_two(0));
+
+ tmp = 3;
+ for (uint32_t val = 5; val < max; val *= tmp) {
+ EXPECT_FALSE(bbq_check_power_of_two(val));
+ if (val >= max / tmp) {
+ break; // 即将越界
+ }
+ }
+
+ tmp = 2;
+ for (uint32_t val = 1; val < max; val *= tmp) {
+ EXPECT_TRUE(bbq_check_power_of_two(val));
+ if (val >= max / tmp) {
+ break;
+ }
+ }
+}
+
+TEST_F(ut_bbq, bbq_block_number_calc) {
+ uint32_t tmp = 2;
+ uint32_t max = pow(2, 32) - 1;
+ uint32_t bn = 0, bs = 0;
+ int ret = 0;
+
+ ret = bbq_bnbs_calc(1, &bn, &bs);
+ EXPECT_EQ(ret, BBQ_ERR_OUT_OF_RANGE);
+
+ for (uint32_t val = 2; val < max; val *= tmp) {
+ ret = bbq_bnbs_calc(val, &bn, &bs);
+ EXPECT_EQ(ret, BBQ_OK);
+ if (val <= 128) {
+ EXPECT_EQ(bn, 2);
+ } else if (val <= 2048) {
+ EXPECT_EQ(bn, 4);
+ } else if (val <= 32768) {
+ EXPECT_EQ(bn, 8);
+ } else if (val <= 524288) {
+ EXPECT_EQ(bn, 16);
+ } else if (val <= 8388608) {
+ EXPECT_EQ(bn, 32);
+ } else if (val <= 134217728) {
+ EXPECT_EQ(bn, 64);
+ } else if (val <= 2147483648) {
+ EXPECT_EQ(bn, 128);
+ } else {
+ EXPECT_TRUE(0); // 异常
+ }
+
+ if (val >= max / tmp) {
+ break;
+ }
+ }
+}
+
+#define OFFSETOF(type, member) ((size_t) & ((type *)0)->member)
+#define PRINT_OFFSETOF(type, member) printf("Offset of '%s' in '%s' is %zu\n", #member, #type, OFFSETOF(type, member))
+#define PTR_ALIGNED_64(ptr) (((uintptr_t)ptr & (64 - 1)) == 0)
+#define SIZE_ALIGNED_64(v) (((sizeof(v)) & (64 - 1)) == 0)
+
+TEST_F(ut_bbq, bbq_cache_line) {
+
+ // 创建队列
+ struct bbq *q = bbq_create("ut_bbq", 4096, BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW,
+ ut_malloc_def_callback, ut_free_def_callback);
+ ASSERT_NE(q, nullptr);
+
+ // 首地址64字节对齐
+ EXPECT_EQ(PTR_ALIGNED_64(q), true);
+
+ // 关键成员64字节对齐
+ EXPECT_EQ(PTR_ALIGNED_64(&q->name), true);
+ EXPECT_EQ(PTR_ALIGNED_64(&q->socket_id), true);
+ EXPECT_EQ(PTR_ALIGNED_64(&q->phead.value), true);
+ EXPECT_EQ(PTR_ALIGNED_64(&q->chead.value), true);
+ EXPECT_EQ(PTR_ALIGNED_64(&q->blocks), true);
+ EXPECT_EQ(PTR_ALIGNED_64(&q->blocks[0].committed), true);
+ EXPECT_EQ(PTR_ALIGNED_64(&q->blocks[0].entries), true);
+
+ EXPECT_EQ(SIZE_ALIGNED_64(struct bbq_head), true);
+ EXPECT_EQ(SIZE_ALIGNED_64(struct bbq_block), true);
+
+ // PRINT_OFFSETOF(struct bbq, name);
+ // PRINT_OFFSETOF(struct bbq, socket_id);
+ // PRINT_OFFSETOF(struct bbq, phead);
+ // PRINT_OFFSETOF(struct bbq, chead);
+ // PRINT_OFFSETOF(struct bbq, blocks);
+
+ bbq_destory(q);
+}
+
+void expect_phead(struct bbq *q, uint64_t idx, uint64_t vsn, int line) {
+ uint64_t ph = bbq_atomic64_load(&q->phead.value, q->prod_single);
+ EXPECT_EQ(bbq_head_idx(q, ph), idx) << "line: " << line;
+ EXPECT_EQ(bbq_head_vsn(q, ph), vsn) << "line: " << line;
+}
+
+void expect_chead(struct bbq *q, uint64_t idx, uint64_t vsn, int line) {
+ uint64_t ch = bbq_atomic64_load(&q->chead.value, q->cons_single);
+ EXPECT_EQ(bbq_head_idx(q, ch), idx) << "line: " << line;
+ EXPECT_EQ(bbq_head_vsn(q, ch), vsn) << "line: " << line;
+}
+
+void expect_eq_allocated(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) {
+ uint64_t allocated = bbq_atomic64_load(&block->allocated, q->prod_single);
+ EXPECT_EQ(bbq_cur_off(q, allocated), off) << "line: " << line;
+ EXPECT_EQ(bbq_cur_vsn(q, allocated), vsn) << "line: " << line;
+}
+
+void expect_eq_committed(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) {
+ uint64_t committed = bbq_atomic64_load(&block->committed, q->prod_single);
+ EXPECT_EQ(bbq_cur_off(q, committed), off) << "line: " << line;
+ EXPECT_EQ(bbq_cur_vsn(q, committed), vsn) << "line: " << line;
+}
+
+void expect_eq_consumed(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) {
+ uint64_t consumed = bbq_atomic64_load(&block->consumed, q->cons_single);
+ EXPECT_EQ(bbq_cur_off(q, consumed), off) << "line: " << line;
+ EXPECT_EQ(bbq_cur_vsn(q, consumed), vsn) << "line: " << line;
+}
+
+void expect_eq_reserved(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) {
+ uint64_t reserved = bbq_atomic64_load(&block->reserved, q->cons_single);
+ EXPECT_EQ(bbq_cur_off(q, reserved), off) << "line: " << line;
+ EXPECT_EQ(bbq_cur_vsn(q, reserved), vsn) << "line: " << line;
+}
+
+// 初始化状态
+TEST_F(ut_bbq, head_cursor_init) {
+ struct bbq *q;
+ uint32_t bn = 2;
+ uint32_t bs = 4;
+ q = bbq_create_elem_with_bnbs("ut_bbq", bn, bs, sizeof(int),
+ BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW,
+ ut_malloc_def_callback, ut_free_def_callback);
+ ASSERT_NE(q, nullptr);
+
+ // 1.初始化状态,除了第一个block外其他块的4个游标都指向最后一个条目
+
+ EXPECT_EQ(bbq_atomic64_load(&q->phead.value, q->prod_single), 0);
+ EXPECT_EQ(bbq_atomic64_load(&q->chead.value, q->cons_single), 0);
+
+ expect_eq_allocated(q, &q->blocks[0], 0, 0, __LINE__);
+ expect_eq_committed(q, &q->blocks[0], 0, 0, __LINE__);
+ expect_eq_reserved(q, &q->blocks[0], 0, 0, __LINE__);
+ expect_eq_consumed(q, &q->blocks[0], 0, 0, __LINE__);
+ for (uint32_t i = 1; i < bn; i++) {
+ expect_eq_allocated(q, &q->blocks[i], bs, 0, __LINE__);
+ expect_eq_committed(q, &q->blocks[i], bs, 0, __LINE__);
+ expect_eq_reserved(q, &q->blocks[i], bs, 0, __LINE__);
+ expect_eq_consumed(q, &q->blocks[i], bs, 0, __LINE__);
+ }
+ EXPECT_TRUE(bbq_debug_check_array_bounds(q));
+ bbq_destory(q);
+}
+
+void ut_produce_something(uint32_t produce_cnt) {
+ int ret = 0;
+ struct bbq *q;
+ uint32_t bn = 8;
+ uint32_t bs = 4096;
+ int enqueue_data = UT_DATA_MAGIC;
+ int dequeue_data = 0;
+
+ EXPECT_GT(produce_cnt, 0);
+ EXPECT_LE(produce_cnt, bs);
+
+ q = bbq_create_elem_with_bnbs("ut_bbq", bn, bs, sizeof(int),
+ BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW,
+ ut_malloc_def_callback, ut_free_def_callback);
+ ASSERT_NE(q, nullptr);
+
+ // 生产produce_cnt
+ for (uint32_t i = 0; i < produce_cnt; i++) {
+ ret = bbq_enqueue_elem(q, &enqueue_data);
+ EXPECT_TRUE(ret == BBQ_OK);
+ }
+
+ EXPECT_EQ(bbq_atomic64_load(&q->phead.value, q->prod_single), 0);
+ EXPECT_EQ(bbq_atomic64_load(&q->chead.value, q->cons_single), 0);
+ expect_eq_allocated(q, &q->blocks[0], produce_cnt, 0, __LINE__);
+ expect_eq_committed(q, &q->blocks[0], produce_cnt, 0, __LINE__);
+ expect_eq_reserved(q, &q->blocks[0], 0, 0, __LINE__);
+ expect_eq_consumed(q, &q->blocks[0], 0, 0, __LINE__);
+
+ // 消费完
+ for (uint32_t i = 0; i < produce_cnt; i++) {
+ ret = bbq_dequeue_elem(q, &dequeue_data);
+ EXPECT_TRUE(ret == BBQ_OK);
+ EXPECT_EQ(dequeue_data, UT_DATA_MAGIC);
+ }
+
+ EXPECT_EQ(bbq_atomic64_load(&q->phead.value, q->prod_single), 0);
+ EXPECT_EQ(bbq_atomic64_load(&q->chead.value, q->cons_single), 0);
+ expect_eq_allocated(q, &q->blocks[0], produce_cnt, 0, __LINE__);
+ expect_eq_committed(q, &q->blocks[0], produce_cnt, 0, __LINE__);
+ expect_eq_reserved(q, &q->blocks[0], produce_cnt, 0, __LINE__);
+ expect_eq_consumed(q, &q->blocks[0], produce_cnt, 0, __LINE__);
+
+ for (uint32_t i = 1; i < bn; i++) {
+ expect_eq_allocated(q, &q->blocks[i], bs, 0, __LINE__);
+ expect_eq_committed(q, &q->blocks[i], bs, 0, __LINE__);
+ expect_eq_reserved(q, &q->blocks[i], bs, 0, __LINE__);
+ expect_eq_consumed(q, &q->blocks[i], bs, 0, __LINE__);
+ }
+ EXPECT_TRUE(bbq_debug_check_array_bounds(q));
+ bbq_destory(q);
+}
+// 在第一块内生产,然后被消费完
+TEST_F(ut_bbq, head_cursor_produce_something) {
+ ut_produce_something(1);
+ ut_produce_something(567);
+ ut_produce_something(789);
+ ut_produce_something(4096);
+}
+
+void ut_produce_next_block(uint32_t over) {
+ int ret = 0;
+ struct bbq *q;
+ uint32_t bn = 8;
+ uint32_t bs = 4096;
+ uint32_t produce_cnt = bs + over;
+ int enqueue_data = UT_DATA_MAGIC;
+ int dequeue_data = 0;
+
+ EXPECT_GT(over, 0);
+ EXPECT_LT(over, bs);
+
+ q = bbq_create_elem_with_bnbs("ut_bbq", bn, bs, sizeof(int),
+ BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW,
+ ut_malloc_def_callback, ut_free_def_callback);
+ ASSERT_NE(q, nullptr);
+
+ // 生产至第二块的第一个entry
+ for (uint32_t i = 0; i < produce_cnt; i++) {
+ ret = bbq_enqueue_elem(q, &enqueue_data);
+ EXPECT_TRUE(ret == BBQ_OK);
+ }
+
+ EXPECT_EQ(bbq_atomic64_load(&q->chead.value, q->cons_single), 0);
+ expect_phead(q, 1, 0, __LINE__);
+ expect_eq_allocated(q, &q->blocks[0], bs, 0, __LINE__);
+ expect_eq_committed(q, &q->blocks[0], bs, 0, __LINE__);
+ expect_eq_reserved(q, &q->blocks[0], 0, 0, __LINE__);
+ expect_eq_consumed(q, &q->blocks[0], 0, 0, __LINE__);
+
+ expect_eq_allocated(q, &q->blocks[1], over, 1, __LINE__);
+ expect_eq_committed(q, &q->blocks[1], over, 1, __LINE__);
+ expect_eq_reserved(q, &q->blocks[1], bs, 0, __LINE__);
+ expect_eq_consumed(q, &q->blocks[1], bs, 0, __LINE__);
+
+ // 消费完
+ for (uint32_t i = 0; i < produce_cnt; i++) {
+ ret = bbq_dequeue_elem(q, &dequeue_data);
+ EXPECT_TRUE(ret == BBQ_OK);
+ EXPECT_EQ(dequeue_data, UT_DATA_MAGIC);
+ }
+
+ expect_phead(q, 1, 0, __LINE__);
+ expect_chead(q, 1, 0, __LINE__);
+ expect_eq_allocated(q, &q->blocks[0], bs, 0, __LINE__);
+ expect_eq_committed(q, &q->blocks[0], bs, 0, __LINE__);
+ expect_eq_reserved(q, &q->blocks[0], bs, 0, __LINE__);
+ expect_eq_consumed(q, &q->blocks[0], bs, 0, __LINE__);
+
+ expect_eq_allocated(q, &q->blocks[1], over, 1, __LINE__);
+ expect_eq_committed(q, &q->blocks[1], over, 1, __LINE__);
+ expect_eq_reserved(q, &q->blocks[1], over, 1, __LINE__);
+ expect_eq_consumed(q, &q->blocks[1], over, 1, __LINE__);
+
+ EXPECT_TRUE(bbq_debug_check_array_bounds(q));
+ bbq_destory(q);
+}
+
+// 第一块生产完毕,第二块生产了若干,然后被消费完
+TEST_F(ut_bbq, head_cursor_produce_next_block) {
+ ut_produce_next_block(1);
+ ut_produce_next_block(123);
+ ut_produce_next_block(456);
+ ut_produce_next_block(4095);
+}
+
+void ut_produce_all_loop(uint32_t loop) {
+ int ret = 0;
+ struct bbq *q;
+ uint32_t bn = 8;
+ uint32_t bs = 4096;
+ uint32_t produce_cnt = bn * bs;
+ int enqueue_data = UT_DATA_MAGIC;
+ int dequeue_data = 0;
+
+ q = bbq_create_elem_with_bnbs("ut_bbq", bn, bs, sizeof(int),
+ BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW,
+ ut_malloc_def_callback, ut_free_def_callback);
+ ASSERT_NE(q, nullptr);
+
+ for (uint32_t cnt = 0; cnt < loop; cnt++) {
+ // 所有entry生产完毕
+ for (uint32_t i = 0; i < produce_cnt; i++) {
+ ret = bbq_enqueue_elem(q, &enqueue_data);
+ EXPECT_TRUE(ret == BBQ_OK);
+ }
+
+ // 消费完
+ for (uint32_t i = 0; i < produce_cnt; i++) {
+ ret = bbq_dequeue_elem(q, &dequeue_data);
+ EXPECT_TRUE(ret == BBQ_OK);
+ EXPECT_EQ(dequeue_data, UT_DATA_MAGIC);
+ }
+ }
+
+ expect_phead(q, bn - 1, loop - 1, __LINE__);
+ expect_chead(q, bn - 1, loop - 1, __LINE__);
+
+ expect_eq_allocated(q, &q->blocks[0], bs, loop - 1, __LINE__);
+ expect_eq_committed(q, &q->blocks[0], bs, loop - 1, __LINE__);
+ expect_eq_reserved(q, &q->blocks[0], bs, loop - 1, __LINE__);
+ expect_eq_consumed(q, &q->blocks[0], bs, loop - 1, __LINE__);
+
+ for (uint32_t i = 1; i < bn; i++) {
+ expect_eq_allocated(q, &q->blocks[i], bs, loop, __LINE__);
+ expect_eq_committed(q, &q->blocks[i], bs, loop, __LINE__);
+ expect_eq_reserved(q, &q->blocks[i], bs, loop, __LINE__);
+ expect_eq_consumed(q, &q->blocks[i], bs, loop, __LINE__);
+ }
+
+ EXPECT_TRUE(bbq_debug_check_array_bounds(q));
+ bbq_destory(q);
+}
+
+// 完成多轮的满生产和满消费
+TEST_F(ut_bbq, head_cursor_produce_all_loop) {
+ ut_produce_all_loop(1);
+ ut_produce_all_loop(10);
+ ut_produce_all_loop(23);
+ ut_produce_all_loop(79);
+}
+
+TEST_F(ut_bbq, head_cursor_retry_new_full_empty) {
+ int ret = 0;
+ uint32_t entries_cnt = 4096;
+ uint32_t loop = 1000;
+ struct bbq *q;
+ uint64_t ph = 0;
+ uint64_t ch = 0;
+ int *data = (int *)ut_malloc(UT_MODULE_UTEST, sizeof(*data) * entries_cnt);
+ int tmp_data = 0;
+ EXPECT_TRUE(data);
+
+ q = bbq_create_elem("ut_bbq", entries_cnt, sizeof(int), BBQ_SOCKET_ID_ANY,
+ BBQ_F_RETRY_NEW | BBQ_F_ENABLE_STAT,
+ ut_malloc_def_callback, ut_free_def_callback);
+ ASSERT_NE(q, nullptr);
+ EXPECT_TRUE(bbq_empty(q));
+
+ for (uint32_t i = 0; i < loop; i++) {
+ // 入满队
+ for (uint32_t j = 0; j < entries_cnt; j++) {
+ data[j] = (i + 1) * j;
+ ret = bbq_enqueue_elem(q, &data[j]);
+ EXPECT_TRUE(ret == BBQ_OK) << "ret " << ret;
+ EXPECT_FALSE(bbq_empty(q));
+ }
+
+ // 满队再入队
+ for (uint32_t j = 0; j < entries_cnt / 3; j++) {
+ ret = bbq_enqueue_elem(q, &data[j]);
+ EXPECT_TRUE(ret == BBQ_ERR_FULL);
+ }
+
+ ph = bbq_atomic64_load(&q->phead.value, q->prod_single);
+ ch = bbq_atomic64_load(&q->chead.value, q->cons_single);
+ if (i == 0) {
+ EXPECT_EQ((ph + 1) & q->idx_mask, ch & q->idx_mask);
+ } else {
+ EXPECT_EQ((ph)&q->idx_mask, ch & q->idx_mask);
+ }
+
+ for (uint32_t i = 0; i < q->bn; i++) {
+ EXPECT_EQ(bbq_atomic64_load(&q->blocks[i].committed, q->prod_single) & q->off_mask, q->bs);
+ EXPECT_GE(bbq_atomic64_load(&q->blocks[i].allocated, q->prod_single) & q->off_mask, q->bs);
+ }
+
+ // 全出队
+ for (uint32_t j = 0; j < entries_cnt; j++) {
+ EXPECT_FALSE(bbq_empty(q));
+ ret = bbq_dequeue_elem(q, &tmp_data);
+ EXPECT_TRUE(ret == BBQ_OK);
+ EXPECT_EQ(tmp_data, data[j]);
+ }
+
+ EXPECT_TRUE(bbq_empty(q));
+ // 空出队再出队
+ for (uint32_t j = 0; j < entries_cnt / 2; j++) {
+ ret = bbq_dequeue_elem(q, &tmp_data);
+ EXPECT_TRUE(ret == BBQ_ERR_EMPTY);
+ }
+
+ ph = bbq_atomic64_load(&q->phead.value, q->prod_single);
+ ch = bbq_atomic64_load(&q->chead.value, q->cons_single);
+ EXPECT_EQ(ph & q->idx_mask, ch & q->idx_mask);
+ for (uint32_t i = 0; i < q->bn; i++) {
+ EXPECT_EQ(bbq_atomic64_load(&q->blocks[i].committed, q->prod_single) & q->off_mask, q->bs);
+ EXPECT_GE(bbq_atomic64_load(&q->blocks[i].allocated, q->prod_single) & q->off_mask, q->bs);
+ EXPECT_EQ(bbq_atomic64_load(&q->blocks[i].consumed, q->cons_single) & q->off_mask, q->bs);
+ EXPECT_GE(bbq_atomic64_load(&q->blocks[i].reserved, q->cons_single) & q->off_mask, q->bs);
+ }
+ }
+
+ ut_free(UT_MODULE_UTEST, data);
+ EXPECT_TRUE(bbq_debug_check_array_bounds(q));
+ bbq_destory(q);
+}
+
+TEST_F(ut_bbq, head_cursor_mpsc_faa) {
+ struct ut_info_s info = {
+ .cfg = {
+ .base = {
+ .name = {},
+ .introduce = {},
+ .core_begin = 0,
+ .core_end = 3,
+ },
+ .ring = {
+ .ring_type = UT_RING_TYPE_BBQ,
+ .producer_cnt = 10,
+ .consumer_cnt = 1,
+ .workload = UT_WORKLOAD_SIMPLE,
+ .entries_cnt = 4,
+ .block_count = 1,
+ .burst_cnt = 1,
+ },
+ .run = {
+ .run_ok_times = 9000000,
+ .run_time = 0,
+ },
+ },
+ .ctl = {
+ .running = true,
+ .all_threads_start = {},
+ .producer_exit = {},
+ },
+ };
+
+ // 队列初始化
+ int ret = -1;
+ struct ut_queue q;
+ ret = ut_queue_init_bbq(&info.cfg, &q);
+ ASSERT_TRUE(ret == 0);
+
+ // 创建线程
+ pthread_t *threads = ut_threads_create(&info, &q);
+ ASSERT_TRUE(threads);
+
+ // 等待所有线程完成,回收数据
+ uint32_t thread_cnt = info.cfg.ring.producer_cnt + info.cfg.ring.consumer_cnt;
+ struct ut_exit_data **exit_data = (struct ut_exit_data **)ut_malloc(UT_MODULE_UTEST, sizeof(struct ut_exit_data **) * (thread_cnt));
+ uint32_t i = 0;
+
+ ut_wait_all_threads_exit(&info, thread_cnt, threads, exit_data);
+
+ // 比较数据
+ struct ut_merge_s merge;
+ memset(&merge, 0, sizeof(merge));
+
+ ut_merge_all_data(exit_data, thread_cnt, &merge);
+ EXPECT_EQ(merge.consumer.data_error_cnt, 0);
+ EXPECT_EQ(merge.consumer.ok_cnt, merge.producer.ok_cnt);
+
+ // 释放数据
+ for (i = 0; i < thread_cnt; i++) {
+ ut_exit_data_destory(exit_data[i]);
+ }
+ ut_free(UT_MODULE_UTEST, exit_data);
+ ut_threads_destory(&info, threads);
+ EXPECT_TRUE(bbq_debug_check_array_bounds((struct bbq *)q.ring));
+ ut_queue_destory(&q);
+}
+
+TEST_F(ut_bbq, head_cursor_drop_old_full_empty) {
+ int ret = 0;
+ uint32_t bn = 2;
+ uint32_t bs = 4;
+ uint32_t loop = 1000;
+ struct bbq *q;
+
+ int tmp_data = 0;
+ q = bbq_create_elem_with_bnbs("ut_bbq", bn, bs, sizeof(int),
+ BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD,
+ ut_malloc_def_callback, ut_free_def_callback);
+ ASSERT_NE(q, nullptr);
+ // EXPECT_TRUE(bbq_empty(q));
+
+ for (uint32_t j = 0; j < loop; j++) {
+ // 入满队列
+ for (uint32_t i = 0; i < bn * bs; i++) {
+ ret = bbq_enqueue_elem(q, &i);
+ EXPECT_TRUE(ret == BBQ_OK) << "ret " << ret;
+ // EXPECT_FALSE(bbq_empty(q));
+ }
+
+ // 全出队
+ for (uint32_t i = 0; i < bn * bs; i++) {
+ // EXPECT_FALSE(bbq_empty(q));
+ ret = bbq_dequeue_elem(q, &tmp_data);
+ EXPECT_TRUE(ret == BBQ_OK) << "ret " << ret;
+ EXPECT_EQ(tmp_data, i);
+ }
+
+ // EXPECT_TRUE(bbq_empty(q));
+ // 空队再出队,失败
+ for (uint32_t i = 0; i < bn * bs; i++) {
+ ret = bbq_dequeue_elem(q, &tmp_data);
+ EXPECT_TRUE(ret == BBQ_ERR_EMPTY) << "ret " << ret;
+ }
+
+ expect_phead(q, bn - 1, j, __LINE__);
+ expect_chead(q, bn - 1, j, __LINE__);
+ for (uint32_t i = 0; i < q->bn; i++) {
+ expect_eq_committed(q, &q->blocks[i], q->bs, i == 0 ? j : j + 1, __LINE__);
+ expect_eq_allocated(q, &q->blocks[i], q->bs, i == 0 ? j : j + 1, __LINE__);
+ expect_eq_reserved(q, &q->blocks[i], q->bs, i == 0 ? j : j + 1, __LINE__);
+ EXPECT_EQ(bbq_atomic64_load(&q->blocks[i].consumed, q->cons_single), 0);
+ }
+ }
+ EXPECT_TRUE(bbq_debug_check_array_bounds(q));
+ bbq_destory(q);
+}
+
+TEST_F(ut_bbq, head_cursor_drop_old_full_empty_cover) {
+ int ret = 0;
+ uint32_t bn = 2;
+ uint32_t bs = 4;
+ uint32_t loop = 1000;
+ uint32_t over_cnt = bs + 2;
+ struct bbq *q;
+
+ EXPECT_EQ(over_cnt / bs, 1);
+
+ int tmp_data = 0;
+ q = bbq_create_elem_with_bnbs("ut_bbq", bn, bs, sizeof(int),
+ BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD,
+ ut_malloc_def_callback, ut_free_def_callback);
+ ASSERT_NE(q, nullptr);
+
+ // EXPECT_TRUE(bbq_empty(q));
+ // 入满队列,再入over_cnt
+ for (uint32_t i = 0; i < bn * bs * loop + over_cnt; i++) {
+ ret = bbq_enqueue_elem(q, &i);
+ EXPECT_TRUE(ret == BBQ_OK) << "ret " << ret;
+
+ // uint32_t tmpA = i / (bn * bs);
+ // uint32_t tmpB = i % (bn * bs);
+ // if (tmpA > 0 && (tmpB < bs)) {
+ // // 覆盖第一个块时,整个块被作废,因此都是empty,从第二个块开始可读取
+ // EXPECT_TRUE(bbq_empty(q)) << "i " << i << "tmpA " << tmpA << "tmpB " << tmpB;
+ // } else {
+ // EXPECT_FALSE(bbq_empty(q));
+ // }
+ }
+
+ expect_phead(q, 1, loop, __LINE__);
+ expect_chead(q, 0, 0, __LINE__);
+ // 检查每一个block上游标的正确性
+ for (uint32_t i = 0; i < bn; i++) {
+ expect_eq_committed(q, &q->blocks[i],
+ i == bn - 1 ? over_cnt - bs : bs,
+ i == 0 ? loop : loop + 1,
+ __LINE__);
+ expect_eq_allocated(q, &q->blocks[i],
+ i == bn - 1 ? over_cnt - bs : bs,
+ i == 0 ? loop : loop + 1,
+ __LINE__);
+ expect_eq_reserved(q, &q->blocks[i],
+ i == 0 ? 0 : bs, 0,
+ __LINE__);
+ EXPECT_EQ(bbq_atomic64_load(&q->blocks[i].consumed, false), 0);
+ }
+
+ // 队列中的数据全出队
+ for (uint32_t i = 0; i < over_cnt - bs; i++) {
+ ret = bbq_dequeue_elem(q, &tmp_data);
+ EXPECT_TRUE(ret == BBQ_OK) << "ret " << ret;
+ }
+
+ for (uint32_t i = 0; i < bn * bs; i++) {
+ // EXPECT_TRUE(bbq_empty(q));
+ ret = bbq_dequeue_elem(q, &tmp_data);
+ EXPECT_TRUE(ret == BBQ_ERR_EMPTY) << "ret " << ret;
+ }
+
+ expect_chead(q, 1, 0, __LINE__);
+ for (uint32_t i = 0; i < bn; i++) {
+ expect_eq_committed(q, &q->blocks[i],
+ i == bn - 1 ? over_cnt - bs : bs,
+ i == 0 ? loop : loop + 1,
+ __LINE__);
+ expect_eq_allocated(q, &q->blocks[i],
+ i == bn - 1 ? over_cnt - bs : bs,
+ i == 0 ? loop : loop + 1,
+ __LINE__);
+ expect_eq_reserved(q, &q->blocks[i],
+ i == bn - 1 ? over_cnt - bs : bs,
+ i == 1 ? loop + 1 : 0, __LINE__);
+ EXPECT_EQ(bbq_atomic64_load(&q->blocks[i].consumed, false), 0);
+ }
+
+ EXPECT_TRUE(bbq_debug_check_array_bounds(q));
+ bbq_destory(q);
+} \ No newline at end of file
diff --git a/bbq/unittest/ut_bbq_func.c b/bbq/unittest/ut_bbq_func.c
new file mode 100644
index 0000000..88910ca
--- /dev/null
+++ b/bbq/unittest/ut_bbq_func.c
@@ -0,0 +1,615 @@
+/*
+ * @Author: liuyu
+ * @LastEditTime: 2024-07-07 22:34:03
+ * @Email: [email protected]
+ * @Describe: TODO
+ */
+
+#include "ut_bbq_func.h"
+#include "bbq.h"
+#include <pthread.h>
+#include <string.h>
+#include <sys/prctl.h>
+#include <sys/time.h>
+#include <unistd.h>
+
+struct ut_memory {
+ aotmic_uint64 malloc_cnt;
+ aotmic_uint64 free_cnt;
+};
+
+struct ut_memory ut_memory_g[UT_MODULE_MAX] = {0};
+
+char *ut_ring_type_map[UT_RING_TYPE_MAX] = {
+ [UT_RING_TYPE_BBQ] = UT_RING_TYPE_BBQ_STR,
+ [UT_RING_TYPE_DPDK] = UT_RING_TYPE_DPDK_STR,
+ [UT_RING_TYPE_RMIND] = UT_RING_TYPE_RMIND_STR,
+};
+
+void *ut_malloc(enum ut_module module, size_t size) {
+ void *ptr = malloc(size);
+ if (ptr != NULL) {
+ atomic_fetch_add(&ut_memory_g[module].malloc_cnt, 1);
+ }
+
+ return ptr;
+}
+
+void ut_free(enum ut_module module, void *ptr) {
+ if (ptr != NULL) {
+ atomic_fetch_add(&ut_memory_g[module].free_cnt, 1);
+ }
+ free(ptr);
+}
+
+bool ut_malloc_free_equal() {
+ bool ret = true;
+ for (int i = 0; i < UT_MODULE_MAX; i++) {
+ uint64_t malloc_cnt = atomic_load(&ut_memory_g[i].malloc_cnt);
+ uint64_t free_cnt = atomic_load(&ut_memory_g[i].free_cnt);
+ if (malloc_cnt != free_cnt) {
+ UT_ERR_LOG("[module:%d] malloc:%lu free:%lu, test malloc-free not equal\n", i, malloc_cnt, free_cnt);
+ ret = false;
+ }
+ }
+
+ return ret;
+}
+
+void ut_memory_counter_clear() {
+ memset(ut_memory_g, 0, sizeof(ut_memory_g));
+}
+
+void ut_memory_counter_print() {
+ for (int i = 0; i < UT_MODULE_MAX; i++) {
+ uint64_t malloc_cnt = atomic_load(&ut_memory_g[i].malloc_cnt);
+ uint64_t free_cnt = atomic_load(&ut_memory_g[i].free_cnt);
+ if (malloc_cnt == 0 && free_cnt == 0) {
+ continue;
+ }
+
+ UT_INFO_LOG("[%d]test malloc:%lu free:%lu", i,
+ atomic_load(&ut_memory_g[i].malloc_cnt),
+ atomic_load(&ut_memory_g[i].free_cnt));
+ }
+
+ if (ut_malloc_free_equal()) {
+ UT_INFO_LOG("all memory free");
+ } else {
+ UT_ERR_LOG("memory not all free");
+ }
+}
+
+struct ut_metric ut_clock_time_get() {
+ struct ut_metric metric = {0};
+ clock_gettime(CLOCK_REALTIME, &metric.timestamp); // 系统实时时间,随系统实时时间改变而改变
+ return metric;
+}
+
+uint64_t ut_clock_time_to_ns(struct ut_metric *metric) {
+ return metric->timestamp.tv_nsec + metric->timestamp.tv_sec * 1000 * 1000 * 1000;
+}
+
+double ut_clock_time_to_double(struct ut_metric *metric) {
+ return metric->timestamp.tv_sec +
+ metric->timestamp.tv_nsec * 1.0 / 1000 / 1000 / 1000;
+}
+
+bool ut_clock_time_is_zero(struct ut_metric *metric) {
+ return metric->timestamp.tv_sec == 0 && metric->timestamp.tv_nsec == 0;
+}
+
+bool ut_timespec_is_after(const struct timespec *a, const struct timespec *b) {
+ if (a->tv_sec > b->tv_sec) {
+ // a的秒数大于b的秒数,所以a在b之后
+ return true;
+ } else if (a->tv_sec == b->tv_sec && a->tv_nsec > b->tv_nsec) {
+ // a和b的秒数相同,但a的纳秒数大于b的纳秒数,所以a在b之后
+ return true;
+ }
+ // 否则,a不在b之后
+ return false;
+}
+
+struct ut_metric ut_clock_time_sub(struct ut_metric now, struct ut_metric last) {
+ struct ut_metric diff = {
+ .timestamp.tv_sec = now.timestamp.tv_sec - last.timestamp.tv_sec,
+ .timestamp.tv_nsec = now.timestamp.tv_nsec - last.timestamp.tv_nsec,
+ };
+
+ if (now.timestamp.tv_nsec > last.timestamp.tv_nsec) {
+ diff.timestamp.tv_nsec = now.timestamp.tv_nsec - last.timestamp.tv_nsec;
+ } else {
+ // 从秒借位
+ diff.timestamp.tv_sec--;
+ diff.timestamp.tv_nsec = 1000 * 1000 * 1000 + now.timestamp.tv_nsec - last.timestamp.tv_nsec;
+ }
+
+ return diff;
+}
+
+enum ut_workload ut_workload_str2enum(const char *workload) {
+ if (strcmp(workload, "simple") == 0) {
+ return UT_WORKLOAD_SIMPLE;
+ } else if (strcmp(workload, "complex") == 0) {
+ return UT_WORKLOAD_COMPLEX;
+ }
+
+ return UT_WORKLOAD_MAX;
+}
+
+enum ut_ring_type ut_ring_type_str2enum(const char *ring_type) {
+ if (strcmp(ring_type, UT_RING_TYPE_BBQ_STR) == 0) {
+ return UT_RING_TYPE_BBQ;
+ } else if (strcmp(ring_type, UT_RING_TYPE_DPDK_STR) == 0) {
+ return UT_RING_TYPE_DPDK;
+ } else if (strcmp(ring_type, UT_RING_TYPE_RMIND_STR) == 0) {
+ return UT_RING_TYPE_RMIND;
+ }
+
+ return UT_RING_TYPE_MAX;
+}
+
+char *ut_ring_type_enum2str(enum ut_ring_type ring_type) {
+ if (ring_type >= UT_RING_TYPE_MAX) {
+ return "unknown";
+ } else {
+ return ut_ring_type_map[ring_type];
+ }
+}
+
+int ut_setaffinity(int core_id) {
+ cpu_set_t mask;
+ CPU_ZERO(&mask);
+ CPU_SET(core_id, &mask);
+
+ if (pthread_setaffinity_np(pthread_self(), sizeof(mask), &mask) == -1) {
+ UT_ERR_LOG("pthread_setaffinity_np erro\n");
+ return BBQ_ERR;
+ }
+
+ return BBQ_OK;
+}
+
+void *ut_malloc_def_callback(int32_t socket_id __attribute__((unused)), size_t size) {
+ return malloc(size);
+ // return aligned_alloc(BBQ_CACHE_LINE, size);
+}
+
+void ut_free_def_callback(void *ptr,
+ size_t size __attribute__((unused))) {
+ free(ptr);
+}
+
+uint32_t ut_bbq_enqueue_burst(void *ring, void **obj_table, uint32_t n, uint16_t thread_idx, uint32_t *wait_consumed) {
+ UT_AVOID_WARNING(thread_idx);
+ return bbq_enqueue_burst(ring, obj_table, n, wait_consumed);
+}
+
+int ut_queue_init_bbq(struct ut_cfg *cfg, struct ut_queue *q) {
+#if 0
+ // 开启了BBQ_F_ENABLE_STAT 会导致性能下降
+ unsigned int flags = BBQ_F_RETRY_NEW | BBQ_F_ENABLE_STAT;
+#else
+ unsigned int flags = BBQ_F_RETRY_NEW;
+#endif
+
+ if (cfg->ring.producer_cnt <= 1) {
+ flags |= BBQ_F_SP_ENQ;
+ }
+
+ if (cfg->ring.consumer_cnt <= 1) {
+ flags |= BBQ_F_SC_DEQ;
+ }
+
+ if (cfg->ring.block_count == 0) {
+ q->ring = bbq_create("ut_bbq", cfg->ring.entries_cnt, BBQ_SOCKET_ID_ANY, flags,
+ ut_malloc_def_callback, ut_free_def_callback);
+ } else {
+ q->ring = bbq_create_with_bnbs("ut_bbq", cfg->ring.block_count,
+ cfg->ring.entries_cnt / cfg->ring.block_count,
+ BBQ_SOCKET_ID_ANY, flags, ut_malloc_def_callback, ut_free_def_callback);
+ }
+
+ if (q->ring == NULL) {
+ UT_ERR_LOG("bbq create queue failed");
+ return BBQ_ERR_INPUT_NULL;
+ }
+
+ q->ring_free_f = (ut_ring_free_f)bbq_destory;
+ q->enqueue_f = (ut_ring_enqueue_f)bbq_enqueue;
+ q->dequeue_f = (ut_ring_dequeue_f)bbq_dequeue;
+ q->enqueue_burst_f = (ut_enqueue_burst_f)ut_bbq_enqueue_burst;
+ q->dequeue_burst_f = (ut_dequeue_burst_f)bbq_dequeue_burst;
+ return 0;
+}
+
+void ut_queue_destory(struct ut_queue *q) {
+ if (q != NULL && q->ring_free_f != NULL) {
+ q->ring_free_f(q->ring);
+ }
+}
+
+bool ut_all_producer_exit(struct ut_info_s *ut_info) {
+ return atomic_load(&ut_info->ctl.producer_exit) == ut_info->cfg.ring.producer_cnt;
+}
+
+void ut_wait_all_threads_ready(struct ut_ctl *ctl) {
+ pthread_barrier_wait(&ctl->all_threads_start);
+ UT_DBG_LOG("thread init done!");
+}
+
+struct ut_exit_data *ut_exit_data_create(struct ut_thread_arg *t_arg) {
+ struct ut_exit_data *exit_data = (struct ut_exit_data *)ut_malloc(UT_MODULE_COMMON, sizeof(struct ut_exit_data));
+ if (exit_data == NULL) {
+ UT_ERR_LOG("malloc failed");
+ exit(-1);
+ }
+
+ size_t size = t_arg->info->cfg.ring.entries_cnt;
+ exit_data->simple_data_cnt = size;
+ exit_data->simple_data = ut_data_create(size, UT_DATA_MAGIC_TYPE);
+
+ if (exit_data->simple_data == NULL) {
+ UT_ERR_LOG("malloc failed");
+ exit(-1);
+ }
+ exit_data->arg = t_arg;
+ exit_data->thread_id = pthread_self();
+ exit_data->latency_ns = 0;
+ exit_data->data_error_cnt = 0;
+
+ return exit_data;
+}
+
+void ut_exit_data_destory(struct ut_exit_data *data) {
+ ut_data_destory(data->simple_data, data->simple_data_cnt);
+ ut_free(UT_MODULE_COMMON, data->arg);
+ ut_free(UT_MODULE_COMMON, data);
+}
+
+struct ut_data **ut_data_create(size_t cnt, enum ut_data_type data_type) {
+ struct ut_data **simple_data = ut_malloc(UT_MODULE_DATA, sizeof(*simple_data) * cnt);
+ struct ut_metric enqueue_time = ut_clock_time_get();
+ for (size_t i = 0; i < cnt; i++) {
+ simple_data[i] = ut_malloc(UT_MODULE_DATA, sizeof(*simple_data[i]));
+ if (data_type == UT_DATA_MAGIC_TYPE) {
+ simple_data[i]->data = UT_DATA_MAGIC;
+ } else {
+ simple_data[i]->data = (uintptr_t)(simple_data[i]);
+ }
+ simple_data[i]->enqueue_time = enqueue_time;
+ }
+
+ return simple_data;
+}
+
+void ut_data_destory(struct ut_data **data, size_t cnt) {
+ for (size_t i = 0; i < cnt; i++) {
+ ut_free(UT_MODULE_DATA, data[i]);
+ }
+ ut_free(UT_MODULE_DATA, data);
+}
+
+uint32_t ut_exec_enqueue(struct ut_queue *q, struct ut_data **data, size_t burst_cnt,
+ struct ut_metric *op_use_diff, uint16_t thread_idx) {
+ uint32_t enqueue_cnt = 0;
+ struct ut_metric op_use_start = ut_clock_time_get();
+ uint32_t wait_consumed = 0;
+ enqueue_cnt = q->enqueue_burst_f(q->ring, (void **)data, burst_cnt, thread_idx, &wait_consumed);
+ *op_use_diff = ut_clock_time_sub(ut_clock_time_get(), op_use_start);
+
+ return enqueue_cnt;
+}
+
+uint32_t ut_exec_dequeue(struct ut_queue *q, struct ut_data **data, size_t burst_cnt, struct ut_metric *op_use_diff) {
+ uint32_t dequeue_cnt = 0;
+
+ struct ut_metric op_use_start = ut_clock_time_get();
+ dequeue_cnt = q->dequeue_burst_f(q->ring, (void **)data, burst_cnt, NULL);
+ *op_use_diff = ut_clock_time_sub(ut_clock_time_get(), op_use_start);
+
+ return dequeue_cnt;
+}
+
+void *ut_thread_producer_start(void *arg) {
+ uint32_t enqueue_cnt = 0;
+ uint64_t ok_cnt = 0;
+ uint64_t run_times = 0;
+ struct ut_thread_arg *t_arg = (struct ut_thread_arg *)arg;
+ struct ut_info_s *info = t_arg->info;
+ struct ut_cfg *cfg = &info->cfg;
+ struct ut_queue *q = t_arg->q;
+ struct ut_exit_data *exit_data = ut_exit_data_create(t_arg);
+
+ char thread_name[128] = {0};
+ uint64_t op_ok_latency_ns = 0;
+ uint64_t op_err_latency_ns = 0;
+ uint64_t run_ok_times = cfg->run.run_ok_times / cfg->ring.producer_cnt;
+ struct ut_metric op_latency = {0};
+ snprintf(thread_name, sizeof(thread_name), "producer:%lu", exit_data->thread_id);
+ prctl(PR_SET_NAME, thread_name);
+ if (ut_setaffinity(t_arg->core) != BBQ_OK) {
+ UT_ERR_LOG("ut_setaffinity error");
+ exit(-1);
+ }
+
+ ut_wait_all_threads_ready(&info->ctl);
+ UT_INFO_LOG("producer thread:%lx, core:%d", exit_data->thread_id, t_arg->core);
+
+ exit_data->metric_start = ut_clock_time_get();
+ while (true) {
+ if ((run_ok_times > 0 && ok_cnt >= run_ok_times) || (!info->ctl.running)) {
+ // 控制次数的循环或运行时间到了
+ break;
+ }
+
+ if (cfg->ring.workload == UT_WORKLOAD_SIMPLE) {
+ enqueue_cnt = ut_exec_enqueue(q, exit_data->simple_data, cfg->ring.burst_cnt, &op_latency, t_arg->thread_idx);
+ } else {
+ struct ut_data **data = ut_data_create(cfg->ring.burst_cnt, UT_DATA_UINTPTR_TYPE);
+ if (data == NULL) {
+ UT_ERR_LOG("malloc falied");
+ exit(-1);
+ }
+
+ enqueue_cnt = ut_exec_enqueue(q, data, cfg->ring.burst_cnt, &op_latency, t_arg->thread_idx);
+ // 释放未入队的内存
+ for (uint32_t i = enqueue_cnt; i < cfg->ring.burst_cnt; i++) {
+ ut_free(UT_MODULE_DATA, data[i]);
+ }
+
+ ut_free(UT_MODULE_DATA, data);
+ }
+
+ if (enqueue_cnt > 0) {
+ ok_cnt += enqueue_cnt;
+ op_ok_latency_ns += ut_clock_time_to_ns(&op_latency);
+ } else {
+ op_err_latency_ns += ut_clock_time_to_ns(&op_latency);
+ }
+
+ run_times++;
+ }
+
+ exit_data->metric_end = ut_clock_time_get();
+ exit_data->run_times = run_times;
+ exit_data->ok_cnt = ok_cnt;
+
+ exit_data->op_ok_latency_ns = op_ok_latency_ns;
+ exit_data->op_err_latency_ns = op_err_latency_ns;
+ atomic_fetch_add(&info->ctl.producer_exit, 1);
+
+ UT_DBG_LOG("producer-----> en_ok:%lu", ok_cnt);
+ pthread_exit(exit_data);
+}
+
+void *ut_thread_consumer_start(void *arg) {
+ uint32_t deq_cnt = -1;
+ uint64_t ok_cnt = 0;
+ uint64_t run_times = 0;
+ struct ut_thread_arg *t_arg = (struct ut_thread_arg *)arg;
+ struct ut_info_s *info = t_arg->info;
+ struct ut_cfg *cfg = &info->cfg;
+ struct ut_queue *q = t_arg->q;
+ struct ut_exit_data *exit_data = ut_exit_data_create(t_arg);
+ uint64_t latency_ns = 0;
+ struct ut_metric op_latency = {0};
+ uint64_t op_ok_latency_ns = 0;
+ uint64_t op_err_latency_ns = 0;
+ uint64_t data_error_cnt = 0;
+ char thread_name[128] = {0};
+ struct ut_data **deq_data = ut_malloc(UT_MODULE_DATA, sizeof(*deq_data) * cfg->ring.entries_cnt);
+
+ snprintf(thread_name, sizeof(thread_name), "consumer:%lu", exit_data->thread_id);
+ prctl(PR_SET_NAME, thread_name);
+ if (ut_setaffinity(t_arg->core) != BBQ_OK) {
+ UT_ERR_LOG("ut_setaffinity error");
+ exit(-1);
+ }
+
+ ut_wait_all_threads_ready(&info->ctl);
+ UT_INFO_LOG("consumer thread:%lx, core:%d", exit_data->thread_id, t_arg->core);
+
+ exit_data->metric_start = ut_clock_time_get();
+
+ while (true) {
+ if (ut_all_producer_exit(info) && deq_cnt == 0) {
+ // 运行时间到了或是所有生产者退出了,检查生产者是否全部退出,且队列被消费完了
+ break;
+ }
+
+ deq_cnt = ut_exec_dequeue(q, deq_data, cfg->ring.burst_cnt, &op_latency);
+ if (deq_cnt > 0) {
+ for (uint32_t i = 0; i < deq_cnt; i++) {
+ struct ut_data *data = deq_data[i];
+ if (cfg->ring.workload == UT_WORKLOAD_SIMPLE) {
+ if (data->data != UT_DATA_MAGIC) {
+ UT_ERR_LOG("the obtained data is not consistent with the expectation, expect:%u actual:%lu", UT_DATA_MAGIC, data->data);
+ exit_data->data_error_cnt += 1;
+ }
+ } else {
+ struct ut_metric latency = ut_clock_time_sub(ut_clock_time_get(), data->enqueue_time);
+ if (ut_clock_time_is_zero(&data->enqueue_time)) {
+ UT_ERR_LOG("enqueue_time is 0");
+ exit(-1);
+ }
+
+ if (data->data != (uintptr_t)data) {
+ UT_ERR_LOG("the obtained data is not consistent with the expectation, expect:%lu actual:%lu", (uintptr_t)data, data->data);
+ data_error_cnt += 1;
+ }
+
+ latency_ns += ut_clock_time_to_ns(&latency);
+ ut_free(UT_MODULE_DATA, data);
+ }
+ }
+ ok_cnt += deq_cnt;
+ op_ok_latency_ns += ut_clock_time_to_ns(&op_latency);
+ } else {
+ op_err_latency_ns += ut_clock_time_to_ns(&op_latency);
+ }
+
+ run_times++;
+ }
+
+ exit_data->metric_end = ut_clock_time_get();
+ exit_data->run_times = run_times;
+ exit_data->ok_cnt = ok_cnt;
+ exit_data->latency_ns = latency_ns;
+ exit_data->op_ok_latency_ns = op_ok_latency_ns;
+ exit_data->op_err_latency_ns = op_err_latency_ns;
+ exit_data->data_error_cnt = data_error_cnt;
+
+ ut_free(UT_MODULE_DATA, deq_data);
+ UT_DBG_LOG("consumer-----> de_ok:%lu", ok_cnt);
+ pthread_exit(exit_data);
+}
+
+void ut_wait_all_threads_exit(struct ut_info_s *info, uint32_t thread_cnt, pthread_t *threads, struct ut_exit_data **exit_data) {
+ if (info->cfg.run.run_time > 0) {
+ UT_DBG_LOG("sleep %lus, and notify all threads to exit...", info->cfg.run.run_time);
+ sleep(info->cfg.run.run_time);
+ info->ctl.running = false;
+ }
+
+ for (uint32_t i = 0; i < thread_cnt; i++) {
+ pthread_join(threads[i], (void **)(&exit_data[i])); // 等待每个线程结束
+ }
+}
+void ut_one_thread_create(struct ut_info_s *info, struct ut_queue *q, enum ut_thread_type ttype, int core, uint16_t thread_id, pthread_t *thread) {
+ UT_DBG_LOG("thread type:%d core:%d", ttype, core);
+ struct ut_thread_arg *arg = (struct ut_thread_arg *)ut_malloc(UT_MODULE_COMMON, sizeof(struct ut_thread_arg)); // 线程回收时free
+ arg->info = info;
+ arg->q = q;
+ arg->ttype = ttype;
+ arg->core = core;
+ arg->thread_idx = thread_id;
+
+ if (ttype == UT_THREAD_PRODUCER) {
+ pthread_create(thread, NULL, ut_thread_producer_start, arg);
+ } else {
+ pthread_create(thread, NULL, ut_thread_consumer_start, arg);
+ }
+}
+
+#define CORE_ID_CHK_SET(core_id, max_id) \
+ do { \
+ core_id = (core_id + 1) <= max_id ? (core_id + 1) : core_id; \
+ } while (0)
+
+pthread_t *ut_threads_create(struct ut_info_s *info, struct ut_queue *q) {
+ // 创建生产者消费者线程
+ uint16_t thread_id = 0;
+ struct ut_cfg *cfg = &info->cfg;
+ int core_id = cfg->base.core_begin;
+ size_t thread_cnt = cfg->ring.producer_cnt + cfg->ring.consumer_cnt;
+ pthread_t *threads = (pthread_t *)ut_malloc(UT_MODULE_COMMON, sizeof(pthread_t) * thread_cnt); // 存储所有线程ID的数组
+
+ pthread_barrier_init(&info->ctl.all_threads_start, NULL, thread_cnt);
+ info->ctl.running = true;
+ info->ctl.producer_exit = ATOMIC_VAR_INIT(0);
+
+ // MPSC 或 SPMC 场景在第一个核心/超线程上分配单个生产者或消费者,然后将其他线程按顺序分配给核心/超线程。
+ // MPMC,我们将生产者和消费者一一交错分配
+ // 如果数量不同,则在最后分配剩余部分。
+ if (cfg->ring.producer_cnt == 1 && cfg->ring.consumer_cnt >= 1) {
+ // SPMC,第一个核心给生产者,其他分配给消费者
+ ut_one_thread_create(info, q, UT_THREAD_PRODUCER, core_id, thread_id, &(threads[thread_id]));
+ thread_id++;
+ for (uint32_t i = 0; i < cfg->ring.consumer_cnt; i++) {
+ CORE_ID_CHK_SET(core_id, cfg->base.core_end);
+ ut_one_thread_create(info, q, UT_THREAD_CONSUMER, core_id, thread_id, &(threads[thread_id]));
+ thread_id++;
+ }
+ } else if (cfg->ring.consumer_cnt == 1 && cfg->ring.producer_cnt >= 1) {
+ // MPSC,第一个核心给消费者,其他分配给生产者
+ ut_one_thread_create(info, q, UT_THREAD_CONSUMER, core_id, thread_id, &(threads[thread_id]));
+ thread_id++;
+ for (uint32_t i = 0; i < cfg->ring.producer_cnt; i++) {
+ CORE_ID_CHK_SET(core_id, cfg->base.core_end);
+ ut_one_thread_create(info, q, UT_THREAD_PRODUCER, core_id, thread_id, &(threads[thread_id]));
+ thread_id++;
+ }
+ } else {
+ // MPMC 或 只有生产者 或这有消费者,核心交错分配
+ uint32_t pcnt = cfg->ring.producer_cnt; // 生产者个数
+ uint32_t ccnt = cfg->ring.consumer_cnt; // 消费者个数
+ for (core_id = cfg->base.core_begin; core_id < cfg->base.core_end && pcnt > 0 && ccnt > 0;) {
+ if ((core_id & 1) == 0) {
+ // 偶数
+ ut_one_thread_create(info, q, UT_THREAD_PRODUCER, core_id, thread_id, &(threads[thread_id]));
+ thread_id++;
+ pcnt--;
+ } else {
+ ut_one_thread_create(info, q, UT_THREAD_CONSUMER, core_id, thread_id, &(threads[thread_id]));
+ thread_id++;
+ ccnt--;
+ }
+ CORE_ID_CHK_SET(core_id, cfg->base.core_end);
+ }
+
+ for (uint32_t i = 0; i < pcnt; i++) {
+ ut_one_thread_create(info, q, UT_THREAD_PRODUCER, core_id, thread_id, &(threads[thread_id]));
+ thread_id++;
+ CORE_ID_CHK_SET(core_id, cfg->base.core_end);
+ }
+
+ for (uint32_t i = 0; i < ccnt; i++) {
+ ut_one_thread_create(info, q, UT_THREAD_CONSUMER, core_id, thread_id, &(threads[thread_id]));
+ thread_id++;
+ CORE_ID_CHK_SET(core_id, cfg->base.core_end);
+ }
+ }
+
+ return threads;
+}
+
+void ut_threads_destory(struct ut_info_s *info, pthread_t *threads) {
+ pthread_barrier_destroy(&info->ctl.all_threads_start);
+ ut_free(UT_MODULE_COMMON, threads);
+}
+
+void ut_merge_data_detail(struct ut_merge_data *merge, struct ut_exit_data *exit_data) {
+ merge->run_times += exit_data->run_times;
+ merge->ok_cnt += exit_data->ok_cnt;
+ merge->latency_ns += exit_data->latency_ns;
+ merge->op_err_latency_ns = exit_data->op_err_latency_ns;
+ merge->op_ok_latency_ns += exit_data->op_ok_latency_ns;
+ merge->data_error_cnt += exit_data->data_error_cnt;
+}
+
+void ut_merge_all_data(struct ut_exit_data **exit_data, uint32_t thread_cnt, struct ut_merge_s *merge) {
+ struct ut_metric p_start = {0};
+ struct ut_metric p_end = {0};
+ struct ut_metric c_start = {0};
+ struct ut_metric c_end = {0};
+
+ for (uint32_t i = 0; i < thread_cnt; i++) {
+ // 根据生产者/消费者 线程最早开始和最晚结束,记录时间
+ if (exit_data[i]->arg->ttype == UT_THREAD_PRODUCER) {
+ if (ut_clock_time_is_zero(&p_start) || ut_timespec_is_after(&p_start.timestamp, &exit_data[i]->metric_start.timestamp)) {
+ p_start = exit_data[i]->metric_start;
+ }
+
+ if (ut_timespec_is_after(&exit_data[i]->metric_start.timestamp, &p_end.timestamp)) {
+ p_end = exit_data[i]->metric_end;
+ }
+
+ ut_merge_data_detail(&merge->producer, exit_data[i]);
+ } else {
+ if (ut_clock_time_is_zero(&c_start) || ut_timespec_is_after(&c_start.timestamp, &exit_data[i]->metric_start.timestamp)) {
+ c_start = exit_data[i]->metric_start;
+ }
+
+ if (ut_timespec_is_after(&exit_data[i]->metric_start.timestamp, &c_end.timestamp)) {
+ c_end = exit_data[i]->metric_end;
+ }
+
+ ut_merge_data_detail(&merge->consumer, exit_data[i]);
+ }
+ }
+
+ merge->producer.use_time = ut_clock_time_sub(p_end, p_start);
+ merge->consumer.use_time = ut_clock_time_sub(c_end, c_start);
+} \ No newline at end of file
diff --git a/bbq/unittest/ut_bbq_func.h b/bbq/unittest/ut_bbq_func.h
new file mode 100644
index 0000000..322f2cf
--- /dev/null
+++ b/bbq/unittest/ut_bbq_func.h
@@ -0,0 +1,302 @@
+/*
+ * @Author: liuyu
+ * @LastEditTime: 2024-07-07 21:57:13
+ * @Email: [email protected]
+ * @Describe: TODO
+ */
+#pragma once
+
+#include "bbq.h"
+#include <pthread.h>
+
+#include <pthread.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <time.h>
+
+#ifndef __cplusplus
+// C
+#include <stdatomic.h>
+#endif
+
+enum ut_thread_type {
+ UT_THREAD_PRODUCER,
+ UT_THREAD_CONSUMER,
+ UT_THREAD_TYPE_MAX,
+};
+
+struct ut_metric {
+ struct timespec timestamp; // 系统时间戳
+ // uint64_t cycles; // cpu运行的cycle
+};
+
+struct ut_report {
+ uint64_t throughput; // 吞吐量:每秒消耗的条目总数。
+ double data_latency; // 数据延迟:每个数据在队列中停留的平均时间。
+ double op_latency; // 操作延迟:每个入队或出队操作的平均延迟。
+ double *fairness; // 公平性:每个生产者/消费者的吞吐量(占总吞吐的百分比)
+ double full_empty; // 队列满时入队的延迟/队列空时出队的延迟(仅用于简单工作负载)。
+ uint64_t oversubscription; // 比核心/超线程更多的生产者和消费者的吞吐量
+};
+
+enum ut_workload {
+ UT_WORKLOAD_SIMPLE, // 简单负载,每个生产者或消费者都有自己的线程,它们在循环中不断执行入队或出队操作。每次出队后都会验证数据。
+ UT_WORKLOAD_COMPLEX, // 复杂负载,基于简单工作负载。生产者和消费者为数据分配空间,执行入队和出队,然后手动释放
+ UT_WORKLOAD_MAX,
+};
+
+#define UT_RING_TYPE_BBQ_STR "bbq"
+#define UT_RING_TYPE_DPDK_STR "dpdk"
+#define UT_RING_TYPE_RMIND_STR "rmind"
+enum ut_ring_type {
+ UT_RING_TYPE_BBQ,
+ UT_RING_TYPE_DPDK,
+ UT_RING_TYPE_RMIND,
+ UT_RING_TYPE_MAX,
+};
+
+struct ut_cfg_base {
+ char name[128]; // 配置文件名
+ char introduce[128]; // 测试配置说明
+ uint16_t core_begin; // 起始核心
+ uint16_t core_end; // 终止核心
+};
+
+struct ut_cfg_ring {
+ enum ut_ring_type ring_type; // ring buffer类型
+ uint32_t producer_cnt; // 生产者个数
+ uint32_t consumer_cnt; // 消费者个数
+ enum ut_workload workload; // 负载模式
+ uint64_t entries_cnt; // ring初始化时分配entry的个数
+ uint32_t block_count; // bbq block个数,为0时表示根据entries_cnt自动计算
+ uint32_t burst_cnt; // 批量出入队个数
+};
+
+struct ut_cfg_run {
+ uint64_t run_ok_times; // 成功入队/入队次数
+ uint64_t run_time; // 整体运行时间,单位秒
+};
+
+struct ut_cfg {
+ struct ut_cfg_base base;
+ struct ut_cfg_ring ring;
+ struct ut_cfg_run run;
+};
+
+struct ut_ctl {
+ volatile bool running; // 默认为true,当设置为false,即所有生产者消费者即将退出
+ pthread_barrier_t all_threads_start;
+#ifndef __cplusplus
+ // C
+ atomic_uint producer_exit;
+#else
+ // C++ 为了兼容gtest测试
+ std::atomic<uint32_t> producer_exit;
+#endif
+};
+
+struct ut_info_s {
+ struct ut_cfg cfg;
+ struct ut_ctl ctl;
+};
+
+enum ut_module {
+ UT_MODULE_UTEST,
+ UT_MODULE_COMMON,
+ UT_MODULE_DATA,
+ UT_MODULE_BCM,
+ UT_MODULE_TABLE,
+ UT_MODULE_RMIND,
+ UT_MODULE_MAX,
+};
+
+#ifdef UT_DEBUG
+#define UT_DBG_LOG(fmt, ...) \
+ do { \
+ printf("[DBG][%s:%d:%s]" fmt "\n", __func__, __LINE__, __FILE__, ##__VA_ARGS__); \
+ } while (0)
+
+#else
+#define UT_DBG_LOG(fmt, ...) \
+ do { \
+ } while (0)
+#endif
+
+#define UT_ERR_LOG(fmt, ...) \
+ do { \
+ printf("\x1b[31m [ERR][%s:%d:%s]" fmt "\x1b[0m\n", __func__, __LINE__, __FILE__, ##__VA_ARGS__); \
+ } while (0)
+
+#define UT_INFO_LOG(fmt, ...) \
+ do { \
+ printf("[INFO][%s:%d:%s]" fmt "\n", __func__, __LINE__, __FILE__, ##__VA_ARGS__); \
+ } while (0)
+
+#define UT_AVOID_WARNING(param) ((void)param)
+
+#define UT_PTR_ARRAY_DATA_INIT(table, t_type, t_count) \
+ do { \
+ if (table != NULL) { \
+ memset(table, 0, sizeof(t_type *) * t_count); \
+ for (uint32_t i = 0; i < t_count; i++) { \
+ table[i] = (t_type *)ut_malloc(UT_MODULE_TABLE, sizeof(t_type)); \
+ if (table[i] == NULL) { \
+ for (uint32_t j = 0; j < i; j++) { \
+ ut_free(UT_MODULE_TABLE, table[j]); \
+ table[j] = NULL; \
+ } \
+ ut_free(UT_MODULE_TABLE, table); \
+ break; \
+ } \
+ *table[i] = (t_type)UT_DATA_MAGIC; \
+ } \
+ } \
+ } while (0)
+
+#define UT_PTR_ARRAY_DATA_DESTORY(table, t_count) \
+ do { \
+ if (table != NULL) { \
+ for (uint32_t i = 0; i < t_count; i++) { \
+ ut_free(UT_MODULE_TABLE, table[i]); \
+ table[i] = NULL; \
+ } \
+ } \
+ } while (0)
+
+#define UT_DOUBLE_PTR_DATA_INIT(table, t_type, t_count) \
+ do { \
+ table = (t_type **)ut_malloc(UT_MODULE_TABLE, sizeof(t_type *) * t_count); \
+ if (table != NULL) { \
+ UT_PTR_ARRAY_DATA_INIT(table, t_type, t_count); \
+ } \
+ } while (0)
+
+#define UT_DOUBLE_PTR_DATA_DESTORY(table, t_count) \
+ do { \
+ if (table != NULL) { \
+ UT_PTR_ARRAY_DATA_DESTORY(table, t_count); \
+ ut_free(UT_MODULE_TABLE, table); \
+ table = NULL; \
+ } \
+ } while (0)
+
+#define UT_ARRAY_DATA_INIT(table, t_count) \
+ do { \
+ for (int i = 0; i < t_count; i++) { \
+ table[i] = UT_DATA_MAGIC; \
+ } \
+ } while (0)
+
+#define UT_DATA_MAGIC 0x1F // 为了兼容所有类型的数据,存储1字节大小的数据
+
+typedef void (*ut_ring_free_f)(void *ring);
+typedef int (*ut_ring_enqueue_f)(void *ring, void *obj);
+typedef int (*ut_ring_dequeue_f)(void *ring, void *obj);
+typedef uint32_t (*ut_enqueue_burst_f)(void *ring, void **obj_table, uint32_t n, uint16_t thread_idx, uint32_t *wait_consumed);
+typedef uint32_t (*ut_dequeue_burst_f)(void *ring, void **obj_table, uint32_t n, uint32_t *wait_consumed);
+typedef bool (*ut_ring_empty_f)(void *ring);
+
+struct ut_queue {
+ void *ring;
+ enum ut_ring_type ring_type;
+ ut_ring_free_f ring_free_f;
+ ut_ring_enqueue_f enqueue_f;
+ ut_ring_dequeue_f dequeue_f;
+ ut_enqueue_burst_f enqueue_burst_f;
+ ut_dequeue_burst_f dequeue_burst_f;
+};
+
+struct ut_thread_arg {
+ int core;
+ uint16_t thread_idx; // 线程索引,不是pthread_id
+ enum ut_thread_type ttype;
+ struct ut_info_s *info;
+ struct ut_queue *q;
+};
+
+struct ut_data {
+ uint64_t data; // 数据
+ struct ut_metric enqueue_time; // 入队时间
+};
+
+struct ut_exit_data {
+ pthread_t thread_id;
+ uint64_t run_times;
+ uint64_t ok_cnt;
+ uint64_t latency_ns; // 仅消费者有效,数据停留的时延
+ uint64_t op_ok_latency_ns; // 成功操作的时延
+ uint64_t op_err_latency_ns; // 操作失败的时延, 如满队入队,空队出队
+ uint64_t data_error_cnt; // 发生过至少一次数据不一致的次数
+ size_t simple_data_cnt;
+ struct ut_thread_arg *arg;
+ struct ut_metric metric_start;
+ struct ut_metric metric_end;
+ struct ut_data **simple_data;
+};
+
+struct ut_merge_data {
+ uint64_t run_times;
+ uint64_t ok_cnt;
+ uint64_t latency_ns; // 仅消费者有效,数据停留的时延
+ uint64_t op_ok_latency_ns; // 成功操作的时延
+ uint64_t op_err_latency_ns; // 操作失败的时延, 如满队入队,空队出队
+ uint64_t data_error_cnt; // 发生过至少一次数据不一致的次数
+ struct ut_metric use_time;
+};
+
+struct ut_merge_s {
+ struct ut_merge_data producer;
+ struct ut_merge_data consumer;
+};
+
+enum ut_data_type {
+ UT_DATA_MAGIC_TYPE,
+ UT_DATA_UINTPTR_TYPE,
+};
+extern struct ut_metric ut_clock_time_get();
+extern struct ut_metric ut_clock_time_sub(struct ut_metric now, struct ut_metric last);
+extern int ut_load_config(const char *config, const char *ring_type, uint32_t burst_cnt, struct ut_cfg *cfg);
+extern enum ut_workload ut_workload_str2enum(const char *workload);
+extern enum ut_ring_type ut_ring_type_str2enum(const char *ring_type);
+extern bool ut_clock_time_is_zero(struct ut_metric *metric);
+extern bool ut_timespec_is_after(const struct timespec *a, const struct timespec *b);
+extern char *ut_ring_type_enum2str(enum ut_ring_type ring_type);
+extern uint64_t ut_clock_time_to_ns(struct ut_metric *metric);
+extern double ut_clock_time_to_double(struct ut_metric *metric);
+extern void *ut_malloc(enum ut_module module, size_t size);
+extern void ut_free(enum ut_module module, void *ptr);
+extern void ut_memory_counter_print();
+extern void ut_memory_counter_clear();
+extern bool ut_malloc_free_equal();
+extern int ut_setaffinity(int core_id);
+extern void *ut_malloc_def_callback(int32_t socket_id __attribute__((unused)), size_t size);
+extern void ut_free_def_callback(void *ptr, size_t size __attribute__((unused)));
+extern void ut_threads_destory(struct ut_info_s *info, pthread_t *threads);
+extern pthread_t *ut_threads_create(struct ut_info_s *info, struct ut_queue *q);
+extern void ut_one_thread_create(struct ut_info_s *info, struct ut_queue *q, enum ut_thread_type ttype, int core, uint16_t thread_id, pthread_t *thread);
+extern void ut_wait_all_threads_exit(struct ut_info_s *info, uint32_t thread_cnt, pthread_t *threads, struct ut_exit_data **exit_data);
+extern void *ut_thread_consumer_start(void *arg);
+extern void *ut_thread_producer_start(void *arg);
+extern uint32_t ut_exec_dequeue(struct ut_queue *q, struct ut_data **data, size_t burst_cnt, struct ut_metric *op_use_diff);
+extern uint32_t ut_exec_enqueue(struct ut_queue *q, struct ut_data **data, size_t burst_cnt, struct ut_metric *op_use_diff, uint16_t thread_idx);
+extern void ut_exit_data_destory(struct ut_exit_data *data);
+extern struct ut_exit_data *ut_exit_data_create(struct ut_thread_arg *t_arg);
+extern void ut_wait_all_threads_ready(struct ut_ctl *ctl);
+extern void ut_queue_destory(struct ut_queue *q);
+extern int ut_queue_init_bbq(struct ut_cfg *cfg, struct ut_queue *q);
+extern void ut_merge_all_data(struct ut_exit_data **exit_data, uint32_t thread_cnt, struct ut_merge_s *merge);
+extern uint64_t bbq_head_idx(struct bbq *q, uint64_t x);
+extern uint64_t bbq_cur_off(struct bbq *q, uint64_t x);
+extern uint64_t bbq_head_vsn(struct bbq *q, uint64_t x);
+extern uint64_t bbq_cur_vsn(struct bbq *q, uint64_t x);
+extern struct ut_data **ut_data_create(size_t cnt, enum ut_data_type);
+extern void ut_data_destory(struct ut_data **data, size_t cnt);
+extern bool bbq_debug_check_array_bounds(struct bbq *q);
+extern struct bbq *bbq_create_elem_with_bnbs(const char *name, uint32_t bn,
+ uint32_t bs, size_t obj_size,
+ int socket_id, uint32_t flags,
+ bbq_malloc_f malloc_f, bbq_free_f free_f);
+extern struct bbq *bbq_create_with_bnbs(const char *name, uint32_t bn, uint32_t bs,
+ int socket_id, uint32_t flags,
+ bbq_malloc_f malloc_f, bbq_free_f free_f); \ No newline at end of file