diff options
| author | 刘煜 <[email protected]> | 2024-07-02 04:10:13 +0000 |
|---|---|---|
| committer | 刘煜 <[email protected]> | 2024-07-02 04:10:13 +0000 |
| commit | d893e4382c694e16c37bb18713e4f54d604fce55 (patch) | |
| tree | d33d775e0bea8c469d8c7682168e3a9ae286e0c0 | |
| parent | 7f309a3257c04abbf20e5467d081da96413e4d21 (diff) | |
| parent | 45de126d21a9079f0830f12f98c07fb7c5f6451e (diff) | |
Merge branch 'dev-perf-single' into 'dev'
1.性能优化(内存池、spsc优化)2、bugfix 游标溢出问题 3、feature:malloc 和free函数通过参数传入
See merge request liuyu/bbq!8
| -rw-r--r-- | .vscode/settings.json | 7 | ||||
| -rw-r--r-- | bbq/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | bbq/include/bbq.h | 100 | ||||
| -rw-r--r-- | bbq/src/bbq.c | 437 | ||||
| -rw-r--r-- | bbq/tests/common/test_mix.c | 6 | ||||
| -rw-r--r-- | bbq/tests/common/test_queue.c | 36 | ||||
| -rw-r--r-- | bbq/tests/unittest/ut_example.cc | 28 | ||||
| -rw-r--r-- | bbq/tests/unittest/ut_head_cursor.cc | 104 | ||||
| -rw-r--r-- | bbq/tests/unittest/ut_mix.cc | 20 | ||||
| -rw-r--r-- | bbq/tests/unittest/ut_multit.cc | 4 | ||||
| -rw-r--r-- | perf/CMakeLists.txt | 3 | ||||
| -rw-r--r-- | perf/benchmark/bcm_benchmark.c | 6 | ||||
| -rw-r--r-- | perf/benchmark/bcm_queue.c | 22 | ||||
| -rwxr-xr-x | perf/benchmark/benchmark.sh | 6 |
14 files changed, 471 insertions, 309 deletions
diff --git a/.vscode/settings.json b/.vscode/settings.json index 2aff784..4814c5b 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -92,6 +92,9 @@ "test_mix.h": "c", "test_queue.h": "c", "prctl.h": "c", - "types.h": "c" - } + "types.h": "c", + "chrono": "c", + "fstream": "c" + }, + "commentTranslate.hover.enabled": true }
\ No newline at end of file diff --git a/bbq/CMakeLists.txt b/bbq/CMakeLists.txt index 1242a45..e8f8a9b 100644 --- a/bbq/CMakeLists.txt +++ b/bbq/CMakeLists.txt @@ -22,6 +22,7 @@ add_compile_options(-Wall -Wextra) if(CMAKE_BUILD_TYPE STREQUAL "Debug") add_definitions(-DBBQ_MEMORY) endif() +add_definitions(-D_GNU_SOURCE) # 库生成的路径 set(LIB_PATH ${OUTPUT_DIR}/lib) diff --git a/bbq/include/bbq.h b/bbq/include/bbq.h index 3b8269d..8c622b7 100644 --- a/bbq/include/bbq.h +++ b/bbq/include/bbq.h @@ -1,6 +1,6 @@ /* * @Author: [email protected] - * @LastEditTime: 2024-06-27 03:04:19 + * @LastEditTime: 2024-07-01 23:56:27 * @Describe: bbq(Block-based Bounded Queue)头文件 * 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf */ @@ -15,49 +15,76 @@ // C #include <stdatomic.h> typedef atomic_uint_fast64_t aotmic_uint64; -typedef aotmic_uint64 bbq_cursor; -typedef aotmic_uint64 bbq_head; #else // C++ 为了兼容gtest测试 -using bbq_cursor = std::atomic<uint64_t>; -using bbq_head = std::atomic<uint64_t>; using aotmic_uint64 = std::atomic<uint64_t>; #endif #define BBQ_SOCKET_ID_ANY -1 -#define __BBQ_CACHE_ALIGNED __attribute__((__aligned__(64))) #define BBQ_SYMBOL_MAX 64 +#define __BBQ_CACHE_ALIGNED __attribute__((__aligned__(64))) + +struct bbq_atomic64 { + bool single; // 如果为单生产者或单消费者,则single为true + union { + volatile uint64_t s; // single使用该字段 + aotmic_uint64 m; + }; +} __BBQ_CACHE_ALIGNED; struct bbq_block { - bbq_cursor committed; // 已提交(version|offset) - bbq_cursor allocated; // 已分配(version|offset) - bbq_cursor reserved; // 已预留(version|offset) - bbq_cursor consumed; // 已消费(version|offset)注:在drop-old模式下没用到 - char *entries; // 存储大小可变的entry,每个块分配空间:bs * entry_size + struct bbq_atomic64 committed; // 已提交(version|offset) + struct bbq_atomic64 allocated; // 已分配(version|offset) + struct bbq_atomic64 reserved; // 已预留(version|offset) + struct bbq_atomic64 consumed; // 已消费(version|offset)注:在drop-old模式下没用到 + char *entries; // 存储大小可变的entry,每个块分配空间:bs * entry_size } __BBQ_CACHE_ALIGNED; +typedef void *(*bbq_malloc_f)(int32_t socket_id, size_t size); +typedef void (*bbq_free_f)(void *ptr, size_t size); + struct bbq { char name[BBQ_SYMBOL_MAX] __BBQ_CACHE_ALIGNED; - int32_t socket_id; // 用于libnuma分配内存,socket_id小于0将使用malloc分配 - uint32_t bn; // blocks的个数 - uint32_t bs; // blocks.entries的个数 - uint32_t flags; // 标记:retry new 模式,还是drop old模式 - uint32_t idx_bits; // bbq_head里idx所占的位数 - uint32_t off_bits; // bbq_cursor里offset所占的位数 - uint64_t idx_mask; // idx_bits偏移后的掩码 - uint64_t off_mask; // off_bits偏移后的掩码 - uint64_t entry_size; // blocks.entries里每个entry的大小 - bbq_head phead; // 生产者头,指向块的索引,分为两部分:version|idx - bbq_head chead; // 消费者头,指向块的索引,分为两部分:version|idx + int32_t socket_id; // 用于libnuma分配内存,socket_id小于0将使用malloc分配 + uint32_t bn; // blocks的个数 + uint32_t bs; // blocks.entries的个数 + uint32_t flags; // 标记:retry new 模式,还是drop old模式 + uint32_t idx_bits; // bbq_head里idx所占的位数 + uint32_t off_bits; // bbq_cursor里offset所占的位数 + uint64_t idx_mask; // idx_bits偏移后的掩码 + uint64_t off_mask; // off_bits偏移后的掩码 + uint64_t entry_size; // blocks.entries里每个entry的大小 + bbq_malloc_f malloc_f; // 申请内存的函数,默认为malloc + bbq_free_f free_f; // 申请内存的函数,默认为free + + struct bbq_atomic64 phead; // 生产者头,指向块的索引,分为两部分:version|idx + struct bbq_atomic64 chead; // 消费者头,指向块的索引,分为两部分:version|idx + struct { + struct bbq_atomic64 n_enq; + struct bbq_atomic64 n_deq; + } stat; struct bbq_block *blocks; // bn大小的数组 + + struct { + char *ptr; // 内存池起始地址 + size_t off; // 已使用的偏移大小 + size_t size; // 内存池总大小 + } memory_pool; } __BBQ_CACHE_ALIGNED; #define BBQ_F_DEFAULT 0x0 #define BBQ_F_DROP_OLD 0x0002 /**< 创建队列时设置为drop old模式(队列满时,入队成功并覆盖旧数据) */ #define BBQ_F_RETRY_NEW BBQ_F_DEFAULT /**< 创建队列时设置为retry new模式(队列满时,当前入队失败) */ + +#define BBQ_F_SP_ENQ 0x0004 +#define BBQ_F_MP_ENQ BBQ_F_DEFAULT +#define BBQ_F_SC_DEQ 0x0008 +#define BBQ_F_MC_DEQ BBQ_F_DEFAULT +#define BBQ_F_ENABLE_STAT 0x0010 +#define BBQ_F_DISABLE_STAT BBQ_F_DEFAULT /** * 创建bbq队列,使用当前函数创建的队列,后续操作会把指针入队。 * 对应入队函数:bbq_enqueue、bbq_enqueue_burst @@ -72,11 +99,17 @@ struct bbq { * 当检测到不支持多numa,将转为malloc分配内存。 * @param[in] flags * 设置入队策略: - * - BBQ_F_RETRY_NEW(默认):队列满了当前入队失败。 * - BBQ_F_DROP_OLD:队列满时,覆盖旧数据,入队成功 - * 设置生产者消费者模式: - * - BBQ_F_SP_ENQ:单生产者 BBQ_F_MP_ENQ:多生产者(默认) - * - BBQ_F_SC_DEQ:单消费者 BBQ_F_MC_DEQ:多消费者(默认) + * - BBQ_F_RETRY_NEW:队列满了当前入队失败(默认)。 + * 设置生产者模式: + * - BBQ_F_SP_ENQ:单生产者 + * - BBQ_F_MP_ENQ:多生产者(默认) + * 设置消费者模式: + * - BBQ_F_SC_DEQ:单消费者 + * - BBQ_F_MC_DEQ:多消费者(默认) + * 设置统计功能: + * - BBQ_F_ENABLE_STAT:开启统计功能 + * - BBQ_F_DISABLE_STAT:关闭统计功能(默认) * @return * 非NULL:消息队列结构体指针,用于后续出队入队等操作。 * NULL:创建失败,可通过bbq_errno分析具体错误原因: @@ -85,7 +118,8 @@ struct bbq { * - BBQ_ERR_POWER_OF_TWO:count不为2的n次方 * - BBQ_ERR_INPUT_NULL:name传入空指针 */ -extern struct bbq *bbq_create(const char *name, uint32_t count, int socket_id, uint32_t flags); +extern struct bbq *bbq_create(const char *name, uint32_t count, int socket_id, uint32_t flags, + bbq_malloc_f malloc_f, bbq_free_f free_f); /** * 消息队列单个指针入队 @@ -192,7 +226,9 @@ extern uint32_t bbq_dequeue_burst(struct bbq *q, void **obj_table, uint32_t n, u * - BBQ_ERR_POWER_OF_TWO:count不为2的n次方 * - BBQ_ERR_INPUT_NULL:name传入空指针 */ -extern struct bbq *bbq_create_elem(const char *name, uint32_t count, size_t obj_size, int socket_id, uint32_t flags); +extern struct bbq *bbq_create_elem(const char *name, uint32_t count, size_t obj_size, + int socket_id, uint32_t flags, + bbq_malloc_f malloc_f, bbq_free_f free_f); /** * 消息队列单个数据入队(指针指向的数据将被拷贝) @@ -295,6 +331,14 @@ extern void bbq_destory(struct bbq *q); */ extern void bbq_debug_struct_print(struct bbq *q); +/** + * 打印指定块信息(调试用)。 + * + * @param[in] q + * 队列指针 + */ +extern void bbq_debug_block_print(struct bbq *q, struct bbq_block *block); + // 错误码 #define BBQ_OK 0 // 成功 #define BBQ_ERR -1 // 通用错误,无法分类时使用 diff --git a/bbq/src/bbq.c b/bbq/src/bbq.c index 5e63c51..ac74894 100644 --- a/bbq/src/bbq.c +++ b/bbq/src/bbq.c @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-26 04:59:28 + * @LastEditTime: 2024-07-01 23:56:23 * @Email: [email protected] * @Describe: bbq(Block-based Bounded Queue)实现 * 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf @@ -18,6 +18,9 @@ // 判断flags标记位 #define BBQ_F_CHK_DROP_OLD(flags) (flags & BBQ_F_DROP_OLD) #define BBQ_F_CHK_COPY_VALUE(flags) (flags & BBQ_F_COPY_VALUE) +#define BBQ_F_CHK_SP_ENQ(flags) (flags & BBQ_F_SP_ENQ) +#define BBQ_F_CHK_SC_DEQ(flags) (flags & BBQ_F_SC_DEQ) +#define BBQ_F_CHK_STAT_ENABLE(flags) (flags & BBQ_F_ENABLE_STAT) // 避免无用参数的编译告警 #define AVOID_WARNING(param) ((void)param) @@ -78,66 +81,75 @@ static inline uint64_t bbq_set_cur_vsn(struct bbq *q, uint64_t ver) { return ver << q->off_bits; } +uint64_t bbq_atomic64_load(struct bbq_atomic64 *atomic) { + if (atomic->single) { + return atomic->s; + } else { + return atomic_load(&atomic->m); + } +} + +void bbq_atomic64_store(struct bbq_atomic64 *atomic, uint64_t value) { + if (atomic->single) { + atomic->s = value; + } else { + atomic_store(&atomic->m, value); + } +} +static inline uint64_t bbq_atomic64_fetch_add(struct bbq_atomic64 *atomic, uint64_t value) { + if (atomic->single) { + uint64_t old = atomic->s; + atomic->s += value; + return old; + } else { + return atomic_fetch_add(&atomic->m, value); + } +} + // 当BBQ_MEMORY宏定义开关打开,将对内存分配释放进行统计,方便排查内存泄漏 -enum bbq_module { - BBQ_MODULE_MAIN = 0, - BBQ_MODULE_BLOCK_NB, - BBQ_MODULE_BLOCK_ENTRY, - BBQ_MODULE_MAX, -}; #ifdef BBQ_MEMORY #define BBQ_MEM_MAGIC 0xFF struct bbq_memory_s { - aotmic_uint64 malloc_cnt; - aotmic_uint64 malloc_size; - aotmic_uint64 free_cnt; - aotmic_uint64 free_size; + uint64_t malloc_cnt; + uint64_t malloc_size; + uint64_t free_cnt; + uint64_t free_size; }; -struct bbq_memory_s bbq_memory_g[BBQ_MODULE_MAX] = {0}; +struct bbq_memory_s bbq_memory_g = {0}; #endif -static void *bbq_malloc(enum bbq_module module, int socket_id, size_t size) { - void *ptr = NULL; - if (socket_id >= 0) { - ptr = numa_alloc_onnode(size, 0); - } else { - ptr = malloc(size); - } +void *bbq_malloc_def_callback(int32_t socket_id __attribute__((unused)), size_t size) { #ifdef BBQ_MEMORY - if (ptr != NULL) { - atomic_fetch_add(&bbq_memory_g[module].malloc_cnt, 1); - atomic_fetch_add(&bbq_memory_g[module].malloc_size, size); - } -#else - AVOID_WARNING(module); + bbq_memory_g.malloc_cnt++; + bbq_memory_g.malloc_size += size; #endif - return ptr; + return malloc(size); } -static void bbq_free(enum bbq_module module, int socket_id, void *ptr, size_t size) { - if (socket_id >= 0) { - numa_free(ptr, size); - } else { - free(ptr); - } - +void bbq_free_def_callback(void *ptr, + size_t size __attribute__((unused))) { #ifdef BBQ_MEMORY - if (ptr != NULL) { - atomic_fetch_add(&bbq_memory_g[module].free_cnt, 1); - atomic_fetch_add(&bbq_memory_g[module].free_size, size); + if (ptr) { + bbq_memory_g.free_cnt++; + bbq_memory_g.free_size += size; } -#else - AVOID_WARNING(module); #endif + free(ptr); } /* 原子的比较两个值大小,并设置较大的值,成功则返回设置前的旧值 */ -uint64_t bbq_fetch_max(aotmic_uint64 *atom, uint64_t upd) { +uint64_t bbq_fetch_max(struct bbq_atomic64 *atomic, uint64_t upd) { uint64_t old_value = 0; - do { - old_value = atomic_load(atom); - } while (old_value < upd && !atomic_compare_exchange_weak(atom, &old_value, upd)); + + if (atomic->single) { + old_value = atomic->s; + atomic->s = upd; + } else { + do { + old_value = atomic_load(&atomic->m); + } while (old_value < upd && !atomic_compare_exchange_weak(&atomic->m, &old_value, upd)); + } return old_value; } @@ -183,17 +195,32 @@ int bbq_bnbs_calc(uint32_t entries, uint32_t *bn, uint32_t *bs) { return BBQ_OK; } +void *bbq_malloc_from_pool(struct bbq *q, size_t size) { + if (q->memory_pool.size - q->memory_pool.off < size) { + return NULL; + } + + char *mem = q->memory_pool.ptr + q->memory_pool.off; + q->memory_pool.off += size; + + return mem; +} + /* 块初始化 */ -int block_init(struct bbq *q, struct bbq_block *block, bool cursor_init) { +int block_init(struct bbq *q, struct bbq_block *block, bool cursor_init, uint32_t flags) { + size_t size = 0; + #ifdef BBQ_MEMORY // 末尾多分配一个entry,它永远不应该被修改,以此检查是否存在写越界的问题 - block->entries = bbq_malloc(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, - (q->bs + 1) * q->entry_size); + size = (q->bs + 1) * q->entry_size; + block->entries = bbq_malloc_from_pool(q, size); char *last_entry = block->entries + q->entry_size * q->bs; + memset(block->entries, 0, size); memset(last_entry, BBQ_MEM_MAGIC, q->entry_size); #else - block->entries = bbq_malloc(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, - q->bs * q->entry_size); + size = q->bs * q->entry_size; + block->entries = bbq_malloc_from_pool(q, size); + memset(block->entries, 0, size); #endif if (block->entries == NULL) { @@ -201,20 +228,23 @@ int block_init(struct bbq *q, struct bbq_block *block, bool cursor_init) { return bbq_errno; } - block->committed = ATOMIC_VAR_INIT(0); - block->allocated = ATOMIC_VAR_INIT(0); - block->reserved = ATOMIC_VAR_INIT(0); - block->consumed = ATOMIC_VAR_INIT(0); + if (BBQ_F_CHK_SP_ENQ(flags)) { + block->allocated.single = true; + block->committed.single = true; + } + + if (BBQ_F_CHK_SC_DEQ(flags)) { + block->reserved.single = true; + block->consumed.single = true; + } if (cursor_init) { // block数组里,除了第一块之外需要设置 - block->committed = ATOMIC_VAR_INIT(q->bs); - block->allocated = ATOMIC_VAR_INIT(q->bs); - block->reserved = ATOMIC_VAR_INIT(q->bs); - if (BBQ_F_CHK_DROP_OLD(q->flags)) { - block->consumed = ATOMIC_VAR_INIT(0); // drop old模式下用不到consumed - } else { - block->consumed = ATOMIC_VAR_INIT(q->bs); + bbq_atomic64_store(&block->committed, q->bs); + bbq_atomic64_store(&block->allocated, q->bs); + bbq_atomic64_store(&block->reserved, q->bs); + if (!BBQ_F_CHK_DROP_OLD(q->flags)) { + bbq_atomic64_store(&block->consumed, q->bs); } } @@ -225,11 +255,9 @@ int block_init(struct bbq *q, struct bbq_block *block, bool cursor_init) { void block_destory(struct bbq *q, struct bbq_block *block) { if (block->entries) { #ifdef BBQ_MEMORY - bbq_free(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, - block->entries, (q->bs + 1) * q->entry_size); + q->free_f(block->entries, (q->bs + 1) * q->entry_size); #else - bbq_free(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, - block->entries, q->bs * q->entry_size); + q->free_f(block->entries, q->bs * q->entry_size); #endif block->entries = NULL; } @@ -252,9 +280,11 @@ static unsigned bbq_ceil_log2(uint64_t x) { } /* 创建消息队列,bn和bs必须是2的N次幂,socket_id用于多numa分配内存 */ -static struct bbq *__bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags) { +static struct bbq *__bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs, + size_t obj_size, int socket_id, uint32_t flags, + bbq_malloc_f malloc_f, bbq_free_f free_f) { int ret = 0; - + size_t size = 0; if (bbq_check_power_of_two(bn) == false) { bbq_errno = BBQ_ERR_POWER_OF_TWO; return NULL; @@ -280,37 +310,67 @@ static struct bbq *__bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs, socket_id = BBQ_SOCKET_ID_ANY; } - struct bbq *q = bbq_malloc(BBQ_MODULE_MAIN, socket_id, sizeof(*q)); + if (malloc_f == NULL) { + malloc_f = bbq_malloc_def_callback; + } + + if (free_f == NULL) { + free_f = bbq_free_def_callback; + } + + uint32_t all_size = 0; +#ifdef BBQ_MEMORY + all_size = sizeof(struct bbq) + bn * sizeof(struct bbq_block) + bn * (bs + 1) * obj_size; +#else + all_size = sizeof(struct bbq) + bn * sizeof(struct bbq_block) + bn * bs * obj_size; +#endif + struct bbq *q = malloc_f(socket_id, all_size); if (q == NULL) { bbq_errno = BBQ_ERR_ALLOC; return NULL; } - memset(q, 0, sizeof(*q)); + + memset(q, 0, all_size); + q->memory_pool.size = all_size; + q->memory_pool.ptr = (char *)q; + q->memory_pool.off += sizeof(struct bbq); + ret = snprintf(q->name, sizeof(q->name), "%s", name); q->bn = bn; q->bs = bs; q->entry_size = obj_size; q->socket_id = socket_id; - q->phead = ATOMIC_VAR_INIT(0); - q->chead = ATOMIC_VAR_INIT(0); + if (BBQ_F_CHK_SP_ENQ(flags)) { + q->phead.single = true; + } + if (BBQ_F_CHK_SC_DEQ(flags)) { + q->chead.single = true; + } q->flags = flags; + q->malloc_f = malloc_f; + q->free_f = free_f; - q->blocks = bbq_malloc(BBQ_MODULE_BLOCK_NB, socket_id, bn * sizeof(*q->blocks)); + size = bn * sizeof(*q->blocks); + q->blocks = bbq_malloc_from_pool(q, size); if (q->blocks == NULL) { bbq_errno = BBQ_ERR_ALLOC; goto error; } - memset(q->blocks, 0, sizeof(*q->blocks)); + memset(q->blocks, 0, size); for (uint32_t i = 0; i < bn; ++i) { - ret = block_init(q, &(q->blocks[i]), (i == 0 ? false : true)); + ret = block_init(q, &(q->blocks[i]), (i == 0 ? false : true), flags); if (ret != BBQ_OK) { goto error; } } q->idx_bits = bbq_ceil_log2(bn); - q->off_bits = bbq_ceil_log2(bs) + 1; // 多线程同时add,可能超过bs的问题,因此多分配一位 TODO:bs占位少的时候多分一些? + uint32_t off_bits = bbq_ceil_log2(bs); + if (off_bits < 13) { + off_bits = 13; + } + q->off_bits = off_bits; // 多线程同时FAA,可能会超过最大索引,因此多分配一些空间,防止FAA溢出 TODO:多少合适?ver溢出问题? q->idx_mask = (1 << q->idx_bits) - 1; q->off_mask = (1 << q->off_bits) - 1; @@ -323,19 +383,25 @@ error: } /* 使用自定义的bn、bs创建指针入队的bbq,一般用于单元测试 */ -struct bbq *bbq_create_with_bnbs(const char *name, uint32_t bn, uint32_t bs, int socket_id, uint32_t flags) { +struct bbq *bbq_create_with_bnbs(const char *name, uint32_t bn, uint32_t bs, + int socket_id, uint32_t flags, + bbq_malloc_f malloc_f, bbq_free_f free_f) { bbq_errno = BBQ_OK; - return __bbq_create_bnbs(name, bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR); + return __bbq_create_bnbs(name, bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR, malloc_f, free_f); } /* 使用自定义的bn、bs创建值入队的bbq,一般用于单元测试 */ -struct bbq *bbq_create_elem_with_bnbs(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags) { +struct bbq *bbq_create_elem_with_bnbs(const char *name, uint32_t bn, uint32_t bs, + size_t obj_size, int socket_id, uint32_t flags, + bbq_malloc_f malloc_f, bbq_free_f free_f) { bbq_errno = BBQ_OK; - return __bbq_create_bnbs(name, bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE); + return __bbq_create_bnbs(name, bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE, malloc_f, free_f); } /* 创建消息队列,count必须大于1,且是2的N次幂,bn和bs将根据count值自动计算,socket_id用于多numa分配内存,free_func先设置NULL */ -struct bbq *bbq_create_elem(const char *name, uint32_t count, size_t obj_size, int socket_id, uint32_t flags) { +struct bbq *bbq_create_elem(const char *name, uint32_t count, size_t obj_size, + int socket_id, uint32_t flags, + bbq_malloc_f malloc_f, bbq_free_f free_f) { bbq_errno = BBQ_OK; uint32_t bn = 0; uint32_t bs = 0; @@ -344,10 +410,11 @@ struct bbq *bbq_create_elem(const char *name, uint32_t count, size_t obj_size, i return NULL; } - return __bbq_create_bnbs(name, bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE); + return __bbq_create_bnbs(name, bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE, malloc_f, free_f); } -struct bbq *bbq_create(const char *name, uint32_t count, int socket_id, uint32_t flags) { +struct bbq *bbq_create(const char *name, uint32_t count, int socket_id, uint32_t flags, + bbq_malloc_f malloc_f, bbq_free_f free_f) { bbq_errno = BBQ_OK; uint32_t bn = 0; uint32_t bs = 0; @@ -356,7 +423,7 @@ struct bbq *bbq_create(const char *name, uint32_t count, int socket_id, uint32_t return NULL; } - return __bbq_create_bnbs(name, bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR); + return __bbq_create_bnbs(name, bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR, malloc_f, free_f); } /* 释放消息队列,与bbq_ring_create系列接口成对*/ @@ -365,18 +432,13 @@ void bbq_destory(struct bbq *q) { return; } - for (uint32_t i = 0; i < q->bn; ++i) { - block_destory(q, &(q->blocks[i])); - } - - bbq_free(BBQ_MODULE_BLOCK_NB, q->socket_id, q->blocks, q->bn * sizeof(*q->blocks)); - bbq_free(BBQ_MODULE_MAIN, q->socket_id, q, sizeof(*q)); + q->free_f(q->memory_pool.ptr, q->memory_pool.size); } #define BBQ_DATA_TYPE_SINGLE 0x0 #define BBQ_DATA_TYPE_ARRAY_1D 0x1 #define BBQ_DATA_TYPE_ARRAY_2D 0x2 -void commit_entry(struct bbq *q, struct bbq_entry_desc *e, void const *data, uint32_t data_type) { +void bbq_commit_entry(struct bbq *q, struct bbq_entry_desc *e, void const *data, uint32_t data_type) { size_t idx = e->off * q->entry_size; if (BBQ_F_CHK_COPY_VALUE(q->flags)) { @@ -414,23 +476,24 @@ void commit_entry(struct bbq *q, struct bbq_entry_desc *e, void const *data, uin break; } } - atomic_fetch_add(&e->block->committed, e->actual_burst); + bbq_atomic64_fetch_add(&e->block->committed, e->actual_burst); } -struct bbq_queue_state_s allocate_entry(struct bbq *q, struct bbq_block *block, uint32_t n) { +struct bbq_queue_state_s bbq_allocate_entry(struct bbq *q, uint64_t ph, uint32_t n) { struct bbq_queue_state_s state = {0}; - if (bbq_cur_off(q, atomic_load(&block->allocated)) >= q->bs) { + uint64_t ph_idx = bbq_head_idx(q, ph); + + struct bbq_block *block = &(q->blocks[ph_idx]); + if (bbq_cur_off(q, bbq_atomic64_load(&block->allocated)) >= q->bs) { state.state = BBQ_BLOCK_DONE; return state; } - uint64_t old = atomic_fetch_add(&block->allocated, n); - // committed_vsn在当前块被初始化后值是不变的,通过比较vsn值,来判断allocated的off是否溢出了,导致vsn+1 - uint64_t committed_vsn = bbq_cur_vsn(q, atomic_load(&block->committed)); - + uint64_t old = bbq_atomic64_fetch_add(&block->allocated, n); uint64_t cur_vsn = bbq_cur_vsn(q, old); uint64_t cur_off = bbq_cur_off(q, old); - if ((cur_vsn != committed_vsn) || (cur_off >= q->bs)) { + + if (cur_off >= q->bs) { state.state = BBQ_BLOCK_DONE; return state; } @@ -457,13 +520,13 @@ enum bbq_queue_state advance_phead(struct bbq *q, uint64_t ph) { uint64_t ph_vsn = bbq_head_vsn(q, ph); if (BBQ_F_CHK_DROP_OLD(q->flags)) { - cur = atomic_load(&n_blk->committed); + cur = bbq_atomic64_load(&n_blk->committed); // 生产者head避免覆盖上一轮尚未完全提交的区块 if (bbq_cur_vsn(q, cur) == ph_vsn && bbq_cur_off(q, cur) != q->bs) { return BBQ_NOT_AVAILABLE; } } else { - cur = atomic_load(&n_blk->consumed); + cur = bbq_atomic64_load(&n_blk->consumed); uint64_t reserved; uint64_t consumed_off = bbq_cur_off(q, cur); uint64_t consumed_vsn = bbq_cur_vsn(q, cur); @@ -471,7 +534,7 @@ enum bbq_queue_state advance_phead(struct bbq *q, uint64_t ph) { if (consumed_vsn < ph_vsn || (consumed_vsn == ph_vsn && consumed_off != q->bs)) { // 生产者赶上了消费者 - reserved = atomic_load(&n_blk->reserved); + reserved = bbq_atomic64_load(&n_blk->reserved); if (bbq_cur_off(q, reserved) == consumed_off) { return BBQ_NO_ENTRY; } else { @@ -480,9 +543,11 @@ enum bbq_queue_state advance_phead(struct bbq *q, uint64_t ph) { } } - // 用head的version初始化下一个块 - // version在高位,version+1,index或offset也会被清零,如果没有被其他线程执行过。多线程同时只更新一次。 + // 用head的version值初始化下一个块,version在高位,version+1,index或offset也会被清零 uint64_t new_vsn = bbq_set_cur_vsn(q, ph_vsn + 1); + // 其他线程完成了head更新,当前bbq_fetch_max不会再更新,可能在以下两种情况: + // 1)实际ph_vsn与本次要更新的ph_vsn相同。 + // 2)当前ph_vsn已经落后于实际的ph_vsn(且移动到了下一轮), bbq_fetch_max(&n_blk->committed, new_vsn); bbq_fetch_max(&n_blk->allocated, new_vsn); @@ -491,27 +556,37 @@ enum bbq_queue_state advance_phead(struct bbq *q, uint64_t ph) { return BBQ_SUCCESS; } -static uint32_t bbq_wait_consumed_set(struct bbq *q, uint64_t *ch_ptr, uint64_t *ph_ptr, struct bbq_block *blk_ph) { +static uint32_t bbq_wait_consumed_get_by_stat(struct bbq *q, uint64_t enq_update, uint64_t deq_update) { + uint64_t enq_now = enq_update == 0 ? bbq_atomic64_load(&q->stat.n_enq) : enq_update; + uint64_t deq_now = deq_update == 0 ? bbq_atomic64_load(&q->stat.n_deq) : deq_update; + return enq_now - deq_now; +} + +/* 根据实际head以及块上的游标推算出待消费的个数,该函数很影响性能 */ +__attribute__((unused)) static uint32_t bbq_wait_consumed_get_by_head(struct bbq *q, + uint64_t *ch_ptr, + uint64_t *ph_ptr, + struct bbq_block *blk_ph) { uint64_t ch = 0; uint64_t ph = 0; if (ch_ptr != NULL) { ch = *ch_ptr; } else { - ch = atomic_load(&q->chead); + ch = bbq_atomic64_load(&q->chead); } if (ph_ptr != NULL) { ph = *ph_ptr; } else { - ph = atomic_load(&q->phead); + ph = bbq_atomic64_load(&q->phead); } uint64_t ph_idx = bbq_head_idx(q, ph); uint64_t ch_idx = bbq_head_idx(q, ch); - uint64_t committed_off = bbq_cur_off(q, atomic_load(&blk_ph->committed)); + uint64_t committed_off = bbq_cur_off(q, bbq_atomic64_load(&blk_ph->committed)); struct bbq_block *blk_ch = &(q->blocks[bbq_head_idx(q, ch)]); - uint64_t reserved_off = bbq_cur_off(q, atomic_load(&blk_ch->reserved)); + uint64_t reserved_off = bbq_cur_off(q, bbq_atomic64_load(&blk_ch->reserved)); // "生产者"超过"消费者"块的个数 uint64_t idx_diff = ph_idx >= ch_idx ? ph_idx - ch_idx : q->bn - ch_idx + ph_idx; @@ -539,6 +614,7 @@ static uint32_t bbq_wait_consumed_set(struct bbq *q, uint64_t *ch_ptr, uint64_t //----------------------------------------------------------------- /* 消息队列入队 */ static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t n, uint32_t data_type, uint32_t *wait_consumed) { + uint64_t enq_update = 0; struct bbq_status ret = {.status = 0, .actual_burst = 0}; if (q == NULL || data == NULL) { @@ -548,15 +624,18 @@ static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t } while (true) { - uint64_t ph = atomic_load(&q->phead); - struct bbq_block *blk = &(q->blocks[bbq_head_idx(q, ph)]); - struct bbq_queue_state_s state = allocate_entry(q, blk, n); + uint64_t ph = bbq_atomic64_load(&q->phead); + struct bbq_queue_state_s state = bbq_allocate_entry(q, ph, n); switch (state.state) { case BBQ_ALLOCATED: - commit_entry(q, &state.e, data, data_type); + bbq_commit_entry(q, &state.e, data, data_type); ret.actual_burst = state.e.actual_burst; ret.status = BBQ_OK; + + if (BBQ_F_CHK_STAT_ENABLE(q->flags)) { + enq_update = bbq_atomic64_fetch_add(&q->stat.n_enq, state.e.actual_burst) + state.e.actual_burst; + } break; case BBQ_BLOCK_DONE: { enum bbq_queue_state pstate = advance_phead(q, ph); @@ -583,8 +662,8 @@ static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t break; } - if (wait_consumed != NULL) { - *wait_consumed = bbq_wait_consumed_set(q, NULL, &ph, blk); + if (BBQ_F_CHK_STAT_ENABLE(q->flags) && wait_consumed != NULL) { + *wait_consumed = bbq_wait_consumed_get_by_stat(q, enq_update, 0); } return ret; @@ -604,40 +683,37 @@ int bbq_enqueue_elem(struct bbq *q, void const *data) { } /* 更新成功 reserve成功的个数 */ -uint32_t bbq_reserve_update(bbq_cursor *aotmic, uint64_t reserved, uint32_t n) { - // TODO:逻辑可以合并 - if (n == 1) { - // fetch_max返回的是旧值,如果旧值等于局部变量reserved,则代表数据由当前线程更新 - if (bbq_fetch_max(aotmic, reserved + 1) == reserved) { - return 1; - } - - return 0; +uint32_t bbq_reserve_update(struct bbq_atomic64 *atomic, uint64_t reserved, uint32_t n) { + if (atomic->single) { + atomic->s += n; + return n; } else { - bool ret = atomic_compare_exchange_weak(aotmic, &reserved, reserved + n); - return ret == true ? n : 0; + // TODO:合并逻辑? + if (n == 1) { + // fetch_max返回的是旧值,如果旧值等于局部变量reserved,则代表数据由当前线程更新 + if (bbq_fetch_max(atomic, reserved + 1) == reserved) { + return 1; + } + + return 0; + } else { + // 不能用fetch_max??? + bool ret = atomic_compare_exchange_weak(&atomic->m, &reserved, reserved + n); + return ret == true ? n : 0; + } } } struct bbq_queue_state_s bbq_reserve_entry(struct bbq *q, struct bbq_block *block, uint32_t n) { + uint32_t cont = 0; while (true) { struct bbq_queue_state_s state; - uint64_t reserved = atomic_load(&block->reserved); + uint64_t reserved = bbq_atomic64_load(&block->reserved); uint64_t reserved_off = bbq_cur_off(q, reserved); uint64_t reserved_svn = bbq_cur_vsn(q, reserved); if (reserved_off < q->bs) { - uint64_t consumed = atomic_load(&block->consumed); - // TODO:bug?? ver溢出了,在drop old模式下使用了vsn,应该传入consumed的vsn合理? - // TODO:这个情况可能出现? - if (!BBQ_F_CHK_DROP_OLD(q->flags) && reserved_svn != bbq_cur_vsn(q, consumed)) { - // consumed溢出了,这种情况只发生在BBQ_RETRY_NEW,因为BBQ_DROP_OLD模式,consumed没有用到 - state.state = BBQ_BLOCK_DONE; - state.vsn = reserved_svn; - return state; - } - - uint64_t committed = atomic_load(&block->committed); + uint64_t committed = bbq_atomic64_load(&block->committed); uint64_t committed_off = bbq_cur_off(q, committed); if (committed_off == reserved_off) { state.state = BBQ_NO_ENTRY; @@ -646,7 +722,7 @@ struct bbq_queue_state_s bbq_reserve_entry(struct bbq *q, struct bbq_block *bloc // 当前块的数据没有被全部commited,需要通过判断allocated和committed来判断是否存在正在入队进行中的数据 if (committed_off != q->bs) { - uint64_t allocated = atomic_load(&block->allocated); + uint64_t allocated = bbq_atomic64_load(&block->allocated); if (bbq_cur_off(q, allocated) != committed_off) { state.state = BBQ_NOT_AVAILABLE; return state; @@ -655,7 +731,7 @@ struct bbq_queue_state_s bbq_reserve_entry(struct bbq *q, struct bbq_block *bloc uint32_t tmp = committed_off - reserved_off; uint32_t reserved_cnt = bbq_reserve_update(&block->reserved, reserved, tmp < n ? tmp : n); - if (reserved_cnt > 0) { // TODO:多entry时关注 + if (reserved_cnt > 0) { state.state = BBQ_RESERVED; state.e.actual_burst = reserved_cnt; state.e.block = block; @@ -664,7 +740,8 @@ struct bbq_queue_state_s bbq_reserve_entry(struct bbq *q, struct bbq_block *bloc return state; } else { - // 如果不等于代表block.reserved被其他线程Reserved了 + // 已经被其他线程更新过了,当前数据为旧数据,需要重新获取 + cont++; continue; } } @@ -712,13 +789,13 @@ bool consume_entry(struct bbq *q, struct bbq_entry_desc *e, void *deq_data, uint uint64_t allocated; if (BBQ_F_CHK_DROP_OLD(q->flags)) { // TODO:优化,考虑allocated vsn溢出?考虑判断如果生产满了,直接移动head? - allocated = atomic_load(&e->block->allocated); + allocated = bbq_atomic64_load(&e->block->allocated); // 预留的entry所在的块,已经被新生产的数据赶上了 if (bbq_cur_vsn(q, allocated) != e->vsn) { return false; } } else { - atomic_fetch_add(&e->block->consumed, e->actual_burst); + bbq_atomic64_fetch_add(&e->block->consumed, e->actual_burst); } return true; @@ -729,7 +806,7 @@ bool advance_chead(struct bbq *q, uint64_t ch, uint64_t ver) { struct bbq_block *n_blk = &(q->blocks[(ch_idx + 1) & q->idx_mask]); uint64_t ch_vsn = bbq_head_vsn(q, ch); - uint64_t committed = atomic_load(&n_blk->committed); + uint64_t committed = bbq_atomic64_load(&n_blk->committed); uint64_t committed_vsn = bbq_cur_vsn(q, committed); if (BBQ_F_CHK_DROP_OLD(q->flags)) { // 通过检查下一个块的版本是否大于或等于当前块来保证 FIFO 顺序. @@ -753,6 +830,7 @@ bool advance_chead(struct bbq *q, uint64_t ch, uint64_t ver) { /* 消息队列出队 */ static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n, uint32_t data_type, uint32_t *wait_consumed) { + uint64_t deq_update = 0; struct bbq_status ret = {.status = 0, .actual_burst = 0}; if (q == NULL || deq_data == NULL) { bbq_errno = BBQ_ERR_INPUT_NULL; @@ -761,7 +839,7 @@ static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n } while (true) { - uint64_t ch = atomic_load(&q->chead); + uint64_t ch = bbq_atomic64_load(&q->chead); struct bbq_block *blk = &(q->blocks[bbq_head_idx(q, ch)]); struct bbq_queue_state_s state; state = bbq_reserve_entry(q, blk, n); @@ -773,6 +851,10 @@ static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n } ret.status = BBQ_OK; ret.actual_burst = state.e.actual_burst; + + if (BBQ_F_CHK_STAT_ENABLE(q->flags)) { + deq_update = bbq_atomic64_fetch_add(&q->stat.n_deq, state.e.actual_burst) + state.e.actual_burst; + } break; case BBQ_NO_ENTRY: bbq_errno = BBQ_ERR_EMPTY; @@ -795,8 +877,8 @@ static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n break; } - if (wait_consumed != NULL) { - *wait_consumed = bbq_wait_consumed_set(q, &ch, NULL, blk); + if (BBQ_F_CHK_STAT_ENABLE(q->flags) && wait_consumed != NULL) { + *wait_consumed = bbq_wait_consumed_get_by_stat(q, 0, deq_update); } return ret; @@ -933,8 +1015,8 @@ static uint32_t bbq_enqueue_burst_two_dimensional(struct bbq *q, void *const *ob } bool bbq_empty(struct bbq *q) { - uint64_t phead = atomic_load(&q->phead); - uint64_t chead = atomic_load(&q->chead); + uint64_t phead = bbq_atomic64_load(&q->phead); + uint64_t chead = bbq_atomic64_load(&q->chead); uint64_t ph_vsn = bbq_head_vsn(q, phead); uint64_t ch_vsn = bbq_head_vsn(q, chead); @@ -949,12 +1031,12 @@ bool bbq_empty(struct bbq *q) { block = &q->blocks[ph_idx]; if (ph_vsn == ch_vsn) { - if (bbq_cur_off(q, atomic_load(&block->reserved)) == bbq_cur_off(q, atomic_load(&block->committed))) { + if (bbq_cur_off(q, bbq_atomic64_load(&block->reserved)) == bbq_cur_off(q, bbq_atomic64_load(&block->committed))) { return true; } } - bbq_cursor reserved = atomic_load(&block->reserved); + uint64_t reserved = bbq_atomic64_load(&block->reserved); uint64_t reserved_off = bbq_cur_off(q, reserved); if (BBQ_F_CHK_DROP_OLD(q->flags) && @@ -995,26 +1077,19 @@ uint32_t bbq_dequeue_burst_elem(struct bbq *q, void *obj_table, uint32_t n, uint bool bbq_malloc_free_equal() { #ifdef BBQ_MEMORY - bool ret = true; - for (int i = 0; i < BBQ_MODULE_MAX; i++) { - uint64_t malloc_cnt = atomic_load(&bbq_memory_g[i].malloc_cnt); - uint64_t free_cnt = atomic_load(&bbq_memory_g[i].free_cnt); - if (malloc_cnt != free_cnt) { - BBQ_ERR_LOG("[module:%d] malloc:%lu free:%lu, bbq mmalloc-free count not equal\n", i, malloc_cnt, free_cnt); - ret = false; - } + if (bbq_memory_g.malloc_cnt != bbq_memory_g.free_cnt) { + BBQ_ERR_LOG("malloc:%lu free:%lu, bbq mmalloc-free count not equal\n", + bbq_memory_g.malloc_cnt, bbq_memory_g.free_cnt); + return false; + } - uint64_t malloc_size = atomic_load(&bbq_memory_g[i].malloc_size); - uint64_t free_size = atomic_load(&bbq_memory_g[i].free_size); - if (malloc_size != free_size) { - BBQ_ERR_LOG("[module:%d] malloc:%lu byte free:%lu byte, bbq mmalloc-free size not equal\n", i, malloc_size, free_size); - ret = false; - } + if (bbq_memory_g.malloc_size != bbq_memory_g.free_size) { + BBQ_ERR_LOG("malloc:%lu byte free:%lu byte, bbq mmalloc-free size not equal\n", + bbq_memory_g.malloc_size, bbq_memory_g.free_size); + return false; } - return ret; -#else - return true; #endif + return true; } bool bbq_debug_check_array_bounds(struct bbq *q) { @@ -1037,17 +1112,9 @@ bool bbq_debug_check_array_bounds(struct bbq *q) { void bbq_debug_memory_print() { #ifdef BBQ_MEMORY - for (int i = 0; i < BBQ_MODULE_MAX; i++) { - uint64_t malloc_cnt = atomic_load(&bbq_memory_g[i].malloc_cnt); - uint64_t free_cnt = atomic_load(&bbq_memory_g[i].free_cnt); - if (malloc_cnt == 0 && free_cnt == 0) { - continue; - } - - printf("[%d]bbq malloc:%lu free:%lu\n", i, - atomic_load(&bbq_memory_g[i].malloc_cnt), - atomic_load(&bbq_memory_g[i].free_cnt)); - } + printf("bbq malloc:%lu free:%lu\n", + bbq_memory_g.malloc_cnt, + bbq_memory_g.free_cnt); if (bbq_malloc_free_equal()) { printf("all memory free\n"); @@ -1058,23 +1125,25 @@ void bbq_debug_memory_print() { } void bbq_debug_block_print(struct bbq *q, struct bbq_block *block) { - bbq_cursor allocated = atomic_load(&block->allocated); - bbq_cursor committed = atomic_load(&block->committed); - bbq_cursor reserved = atomic_load(&block->reserved); - bbq_cursor consumed = atomic_load(&block->consumed); - printf(" allocated:%lu committed:%lu reserved:%lu", - bbq_cur_off(q, allocated), bbq_cur_off(q, committed), bbq_cur_off(q, reserved)); + uint64_t allocated = bbq_atomic64_load(&block->allocated); + uint64_t committed = bbq_atomic64_load(&block->committed); + uint64_t reserved = bbq_atomic64_load(&block->reserved); + uint64_t consumed = bbq_atomic64_load(&block->consumed); + printf(" allocated:%lu(ver:%lu) committed:%lu(ver:%lu) reserved:%lu(ver:%lu)", + bbq_cur_off(q, allocated), bbq_cur_vsn(q, allocated), + bbq_cur_off(q, committed), bbq_cur_vsn(q, committed), + bbq_cur_off(q, reserved), bbq_cur_vsn(q, reserved)); if (BBQ_F_CHK_DROP_OLD(q->flags)) { printf("\n"); } else { - printf(" consumed:%lu\n", bbq_cur_off(q, consumed)); + printf(" consumed:%lu(ver:%lu)\n", bbq_cur_off(q, consumed), bbq_cur_vsn(q, consumed)); } } void bbq_debug_struct_print(struct bbq *q) { printf("-----bbq:%s-----\n", BBQ_F_CHK_DROP_OLD(q->flags) ? "drop old" : "retry new"); - uint64_t phead = atomic_load(&q->phead); - uint64_t chead = atomic_load(&q->chead); + uint64_t phead = bbq_atomic64_load(&q->phead); + uint64_t chead = bbq_atomic64_load(&q->chead); printf("block number:%u block size:%u total entries:%u\n", q->bn, q->bs, q->bn * q->bs); printf("producer header idx:%lu vsn:%lu\n", bbq_head_idx(q, phead), bbq_head_vsn(q, phead)); diff --git a/bbq/tests/common/test_mix.c b/bbq/tests/common/test_mix.c index 28468f1..713dc99 100644 --- a/bbq/tests/common/test_mix.c +++ b/bbq/tests/common/test_mix.c @@ -1,9 +1,8 @@ /* * @Description: 描述信息 * @Date: 2024-05-25 10:55:48 - * @LastEditTime: 2024-06-17 17:14:16 + * @LastEditTime: 2024-06-27 23:39:44 */ -#define _GNU_SOURCE #include "test_mix.h" #include "bbq.h" #include <pthread.h> @@ -157,11 +156,10 @@ char *test_ring_type_enum2str(test_ring_type ring_type) { int test_setaffinity(int core_id) { cpu_set_t mask; CPU_ZERO(&mask); - CPU_SET(core_id, &mask); if (pthread_setaffinity_np(pthread_self(), sizeof(mask), &mask) == -1) { - fprintf(stderr, "pthread_setaffinity_np erro\n"); + TEST_ERR_LOG("pthread_setaffinity_np erro\n"); return BBQ_ERR; } diff --git a/bbq/tests/common/test_queue.c b/bbq/tests/common/test_queue.c index 0934285..a833843 100644 --- a/bbq/tests/common/test_queue.c +++ b/bbq/tests/common/test_queue.c @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-27 02:50:17 + * @LastEditTime: 2024-07-01 23:56:45 * @Email: [email protected] * @Describe: TODO */ @@ -11,8 +11,12 @@ #include <sys/prctl.h> #include <unistd.h> extern bool bbq_debug_check_array_bounds(struct bbq *q); -extern struct bbq *bbq_create_elem_with_bnbs(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags); -extern struct bbq *bbq_create_with_bnbs(const char *name, uint32_t bn, uint32_t bs, int socket_id, uint32_t flags); +extern struct bbq *bbq_create_elem_with_bnbs(const char *name, uint32_t bn, + uint32_t bs, size_t obj_size, int socket_id, uint32_t flags, + bbq_malloc_f malloc_f, bbq_free_f free_f); +extern struct bbq *bbq_create_with_bnbs(const char *name, uint32_t bn, uint32_t bs, + int socket_id, uint32_t flags, + bbq_malloc_f malloc_f, bbq_free_f free_f); uint32_t test_bbq_enqueue_burst(void *ring, void **obj_table, uint32_t n, uint16_t thread_idx, uint32_t *wait_consumed) { TEST_AVOID_WARNING(thread_idx); @@ -20,12 +24,27 @@ uint32_t test_bbq_enqueue_burst(void *ring, void **obj_table, uint32_t n, uint16 } int test_queue_init_bbq(test_cfg *cfg, test_queue_s *q) { +#if 0 + // 开启了BBQ_F_ENABLE_STAT 会导致性能下降 + unsigned int flags = BBQ_F_RETRY_NEW | BBQ_F_ENABLE_STAT; +#else + unsigned int flags = BBQ_F_RETRY_NEW; +#endif + + if (cfg->ring.producer_cnt <= 1) { + flags |= BBQ_F_SP_ENQ; + } + + if (cfg->ring.consumer_cnt <= 1) { + flags |= BBQ_F_SC_DEQ; + } + if (cfg->ring.block_count == 0) { - q->ring = bbq_create("test_bbq", cfg->ring.entries_cnt, BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); + q->ring = bbq_create("test_bbq", cfg->ring.entries_cnt, BBQ_SOCKET_ID_ANY, flags, NULL, NULL); } else { q->ring = bbq_create_with_bnbs("test_bbq", cfg->ring.block_count, cfg->ring.entries_cnt / cfg->ring.block_count, - BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); + BBQ_SOCKET_ID_ANY, flags, NULL, NULL); } if (q->ring == NULL) { @@ -108,13 +127,8 @@ uint32_t test_exec_enqueue(test_queue_s *q, test_data **data, size_t burst_cnt, test_time_metric *op_use_diff, uint16_t thread_idx) { uint32_t enqueue_cnt = 0; test_time_metric op_use_start = test_clock_time_get(); -#if 0 - // wait_consumed会导致bbq损失部分性能 uint32_t wait_consumed = 0; enqueue_cnt = q->enqueue_burst_f(q->ring, (void **)data, burst_cnt, thread_idx, &wait_consumed); -#else - enqueue_cnt = q->enqueue_burst_f(q->ring, (void **)data, burst_cnt, thread_idx, NULL); -#endif *op_use_diff = test_clock_time_sub(test_clock_time_get(), op_use_start); return enqueue_cnt; @@ -153,6 +167,7 @@ void *test_thread_producer_start(void *arg) { } test_wait_all_threads_ready(&test_info->ctl); + // TEST_INFO_LOG("producer thread:%lx, core:%d", exit_data->thread_id, t_arg->core); exit_data->metric_start = test_clock_time_get(); while (true) { @@ -225,6 +240,7 @@ void *test_thread_consumer_start(void *arg) { } test_wait_all_threads_ready(&test_info->ctl); + // TEST_INFO_LOG("consumer thread:%lx, core:%d", exit_data->thread_id, t_arg->core); exit_data->metric_start = test_clock_time_get(); diff --git a/bbq/tests/unittest/ut_example.cc b/bbq/tests/unittest/ut_example.cc index 0439612..11735cc 100644 --- a/bbq/tests/unittest/ut_example.cc +++ b/bbq/tests/unittest/ut_example.cc @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-25 11:40:27 + * @LastEditTime: 2024-07-01 03:56:54 * @Email: [email protected] * @Describe: 简单的测试用例,测试基本功能 */ @@ -52,7 +52,7 @@ TEST_F(bbq_example, single_retry_new_cp_ptr) { uint16_t *deq_data = NULL; // 创建队列 - struct bbq *q = bbq_create("test_bbq", BUF_CNT, BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); + struct bbq *q = bbq_create("test_bbq", BUF_CNT, BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW, NULL, NULL); ASSERT_NE(q, nullptr); // 空队出队失败 @@ -97,7 +97,7 @@ TEST_F(bbq_example, single_retry_new_cp_value) { uint16_t deq_data; // 创建队列 - struct bbq *q = bbq_create_elem("test_bbq", BUF_CNT, sizeof(uint16_t), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); + struct bbq *q = bbq_create_elem("test_bbq", BUF_CNT, sizeof(uint16_t), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW, NULL, NULL); ASSERT_NE(q, nullptr); // 空队出队失败 @@ -144,7 +144,7 @@ TEST_F(bbq_example, single_drop_old_cp_pointer) { uint64_t second_cnt = 1000; // 创建队列 - struct bbq *q = bbq_create("test_bbq", BUF_CNT, BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD); + struct bbq *q = bbq_create("test_bbq", BUF_CNT, BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD, NULL, NULL); ASSERT_NE(q, nullptr); EXPECT_LT(second_cnt, q->bs * q->bn); @@ -198,7 +198,7 @@ TEST_F(bbq_example, single_drop_old_cp_value) { uint64_t second_cnt = 1000; // 创建队列 - struct bbq *q = bbq_create_elem("test_bbq", BUF_CNT, sizeof(uint16_t), BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD); + struct bbq *q = bbq_create_elem("test_bbq", BUF_CNT, sizeof(uint16_t), BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD, NULL, NULL); ASSERT_NE(q, nullptr); EXPECT_LT(second_cnt, q->bs * q->bn); @@ -254,7 +254,7 @@ TEST_F(bbq_example, burst_retry_new_cp_value) { uint32_t wait_consumed = 0; // 创建队列 - q = bbq_create_elem("test_bbq", BUF_CNT, sizeof(uint16_t), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); + q = bbq_create_elem("test_bbq", BUF_CNT, sizeof(uint16_t), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW | BBQ_F_ENABLE_STAT, NULL, NULL); ASSERT_NE(q, nullptr); EXPECT_LT(first_cnt, q->bn * q->bs); @@ -304,7 +304,7 @@ TEST_F(bbq_example, burst_retry_new_cp_pointer) { uint16_t **deq_table2 = (uint16_t **)test_malloc(TEST_MODULE_DATA, sizeof(uint16_t *) * BUF_CNT); // 创建队列 - q = bbq_create("test_bbq", BUF_CNT, BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); + q = bbq_create("test_bbq", BUF_CNT, BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW | BBQ_F_ENABLE_STAT, NULL, NULL); ASSERT_NE(q, nullptr); EXPECT_LT(first_cnt, q->bn * q->bs); @@ -353,7 +353,7 @@ TEST_F(bbq_example, burst_drop_old_cp_pointer) { uint16_t **deq_table2 = (uint16_t **)test_malloc(TEST_MODULE_DATA, sizeof(uint16_t *) * BUF_CNT); // 创建队列 - q = bbq_create("test_bbq", BUF_CNT, BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD); + q = bbq_create("test_bbq", BUF_CNT, BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD, NULL, NULL); ASSERT_NE(q, nullptr); EXPECT_GT(second_cnt, q->bs); EXPECT_LT(second_cnt, q->bs * q->bn); @@ -361,18 +361,18 @@ TEST_F(bbq_example, burst_drop_old_cp_pointer) { // 批量入队(全部成功,入队个数等于队列总容量,未发生覆盖) ret1 = bbq_enqueue_burst(q, (void *const *)enq_table1, first_cnt, &wait_consumed); EXPECT_EQ(ret1, first_cnt); - EXPECT_EQ(wait_consumed, ret1); + // EXPECT_EQ(wait_consumed, ret1); // 批量入队(全部成功),覆盖了旧数据 ret2 = bbq_enqueue_burst(q, (void *const *)enq_table2, second_cnt, &wait_consumed); EXPECT_EQ(ret2, second_cnt); - EXPECT_EQ(wait_consumed, second_cnt - q->bs); + // EXPECT_EQ(wait_consumed, second_cnt - q->bs); // 出队列(部分成功) // 一旦生产者追上了消费者,之前未消费的,以及当前块的数据全都作废了。本例中第一个完整块作废。 ret1 = bbq_dequeue_burst(q, (void **)deq_table1, BUF_CNT, &wait_consumed); EXPECT_EQ(ret1, second_cnt - q->bs); - EXPECT_EQ(wait_consumed, 0); + // EXPECT_EQ(wait_consumed, 0); // 验证数据 for (uint32_t i = 0; i < ret1; i++) { @@ -402,7 +402,7 @@ TEST_F(bbq_example, burst_drop_old_cp_value) { uint16_t deq_table1[BUF_CNT] = {0}; // 创建队列 - q = bbq_create_elem("test_bbq", BUF_CNT, sizeof(uint16_t), BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD); + q = bbq_create_elem("test_bbq", BUF_CNT, sizeof(uint16_t), BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD, NULL, NULL); ASSERT_NE(q, nullptr); EXPECT_GT(second_cnt, q->bs); EXPECT_LT(second_cnt, q->bs * q->bn); @@ -410,13 +410,13 @@ TEST_F(bbq_example, burst_drop_old_cp_value) { // 批量入队(全部成功) ret1 = bbq_enqueue_burst_elem(q, (void const *)enq_table3, first_cnt, &wait_consumed); EXPECT_EQ(ret1, first_cnt); - EXPECT_EQ(wait_consumed, ret1); + // EXPECT_EQ(wait_consumed, ret1); // 批量入队(全部成功),覆盖了旧数据 // 由于需要将最终的值入队列,二维数组里的值不连续,需要循环赋值。不推荐这个函数,但可用于特殊场景。 ret2 = bbq_enqueue_burst_elem_two_dimensional(q, (void *const *)enq_table1, second_cnt, &wait_consumed); EXPECT_EQ(ret2, second_cnt); - EXPECT_EQ(wait_consumed, second_cnt - q->bs); + // EXPECT_EQ(wait_consumed, second_cnt - q->bs); // 出队列(部分成功) // 一旦生产者追上了消费者,之前未消费的,以及当前块的数据全都作废了。 diff --git a/bbq/tests/unittest/ut_head_cursor.cc b/bbq/tests/unittest/ut_head_cursor.cc index e3b3b50..bbd2296 100644 --- a/bbq/tests/unittest/ut_head_cursor.cc +++ b/bbq/tests/unittest/ut_head_cursor.cc @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-25 11:42:49 + * @LastEditTime: 2024-07-01 03:57:33 * @Email: [email protected] * @Describe: TODO */ @@ -10,7 +10,10 @@ extern "C" { #include "ut.h" extern bool bbq_malloc_free_equal(); extern bool bbq_debug_check_array_bounds(struct bbq *q); -extern struct bbq *bbq_create_elem_with_bnbs(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags); +extern struct bbq *bbq_create_elem_with_bnbs(const char *name, uint32_t bn, uint32_t bs, + size_t obj_size, int socket_id, uint32_t flags, + bbq_malloc_f malloc_f, bbq_free_f free_f); +extern uint64_t bbq_atomic64_load(struct bbq_atomic64 *atomic); } class bbq_head_cursor : public testing::Test { // 继承了 testing::Test @@ -30,33 +33,39 @@ class bbq_head_cursor : public testing::Test { // 继承了 testing::Test }; void expect_phead(struct bbq *q, uint64_t idx, uint64_t vsn, int line) { - EXPECT_EQ(bbq_head_idx(q, q->phead), idx) << "line: " << line; - EXPECT_EQ(bbq_head_vsn(q, q->phead), vsn) << "line: " << line; + uint64_t ph = bbq_atomic64_load(&q->phead); + EXPECT_EQ(bbq_head_idx(q, ph), idx) << "line: " << line; + EXPECT_EQ(bbq_head_vsn(q, ph), vsn) << "line: " << line; } void expect_chead(struct bbq *q, uint64_t idx, uint64_t vsn, int line) { - EXPECT_EQ(bbq_head_idx(q, q->chead), idx) << "line: " << line; - EXPECT_EQ(bbq_head_vsn(q, q->chead), vsn) << "line: " << line; + uint64_t ch = bbq_atomic64_load(&q->chead); + EXPECT_EQ(bbq_head_idx(q, ch), idx) << "line: " << line; + EXPECT_EQ(bbq_head_vsn(q, ch), vsn) << "line: " << line; } void expect_eq_allocated(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) { - EXPECT_EQ(bbq_cur_off(q, block->allocated), off) << "line: " << line; - EXPECT_EQ(bbq_cur_vsn(q, block->allocated), vsn) << "line: " << line; + uint64_t allocated = bbq_atomic64_load(&block->allocated); + EXPECT_EQ(bbq_cur_off(q, allocated), off) << "line: " << line; + EXPECT_EQ(bbq_cur_vsn(q, allocated), vsn) << "line: " << line; } void expect_eq_committed(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) { - EXPECT_EQ(bbq_cur_off(q, block->committed), off) << "line: " << line; - EXPECT_EQ(bbq_cur_vsn(q, block->committed), vsn) << "line: " << line; + uint64_t committed = bbq_atomic64_load(&block->committed); + EXPECT_EQ(bbq_cur_off(q, committed), off) << "line: " << line; + EXPECT_EQ(bbq_cur_vsn(q, committed), vsn) << "line: " << line; } void expect_eq_consumed(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) { - EXPECT_EQ(bbq_cur_off(q, block->consumed), off) << "line: " << line; - EXPECT_EQ(bbq_cur_vsn(q, block->consumed), vsn) << "line: " << line; + uint64_t consumed = bbq_atomic64_load(&block->consumed); + EXPECT_EQ(bbq_cur_off(q, consumed), off) << "line: " << line; + EXPECT_EQ(bbq_cur_vsn(q, consumed), vsn) << "line: " << line; } void expect_eq_reserved(struct bbq *q, bbq_block *block, uint64_t off, uint64_t vsn, int line) { - EXPECT_EQ(bbq_cur_off(q, block->reserved), off) << "line: " << line; - EXPECT_EQ(bbq_cur_vsn(q, block->reserved), vsn) << "line: " << line; + uint64_t reserved = bbq_atomic64_load(&block->reserved); + EXPECT_EQ(bbq_cur_off(q, reserved), off) << "line: " << line; + EXPECT_EQ(bbq_cur_vsn(q, reserved), vsn) << "line: " << line; } // 初始化状态 @@ -64,12 +73,13 @@ TEST_F(bbq_head_cursor, init) { struct bbq *q; uint32_t bn = 2; uint32_t bs = 4; - q = bbq_create_elem_with_bnbs("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); + q = bbq_create_elem_with_bnbs("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW, NULL, NULL); ASSERT_NE(q, nullptr); // 1.初始化状态,除了第一个block外其他块的4个游标都指向最后一个条目 - EXPECT_EQ(q->phead, 0); - EXPECT_EQ(q->chead, 0); + + EXPECT_EQ(bbq_atomic64_load(&q->phead), 0); + EXPECT_EQ(bbq_atomic64_load(&q->chead), 0); expect_eq_allocated(q, &q->blocks[0], 0, 0, __LINE__); expect_eq_committed(q, &q->blocks[0], 0, 0, __LINE__); @@ -96,7 +106,7 @@ void ut_produce_something(uint32_t produce_cnt) { EXPECT_GT(produce_cnt, 0); EXPECT_LE(produce_cnt, bs); - q = bbq_create_elem_with_bnbs("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); + q = bbq_create_elem_with_bnbs("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW, NULL, NULL); ASSERT_NE(q, nullptr); // 生产produce_cnt @@ -105,8 +115,8 @@ void ut_produce_something(uint32_t produce_cnt) { EXPECT_TRUE(ret == BBQ_OK); } - EXPECT_EQ(q->phead, 0); - EXPECT_EQ(q->chead, 0); + EXPECT_EQ(bbq_atomic64_load(&q->phead), 0); + EXPECT_EQ(bbq_atomic64_load(&q->chead), 0); expect_eq_allocated(q, &q->blocks[0], produce_cnt, 0, __LINE__); expect_eq_committed(q, &q->blocks[0], produce_cnt, 0, __LINE__); expect_eq_reserved(q, &q->blocks[0], 0, 0, __LINE__); @@ -119,8 +129,8 @@ void ut_produce_something(uint32_t produce_cnt) { EXPECT_EQ(dequeue_data, TEST_DATA_MAGIC); } - EXPECT_EQ(q->phead, 0); - EXPECT_EQ(q->chead, 0); + EXPECT_EQ(bbq_atomic64_load(&q->phead), 0); + EXPECT_EQ(bbq_atomic64_load(&q->chead), 0); expect_eq_allocated(q, &q->blocks[0], produce_cnt, 0, __LINE__); expect_eq_committed(q, &q->blocks[0], produce_cnt, 0, __LINE__); expect_eq_reserved(q, &q->blocks[0], produce_cnt, 0, __LINE__); @@ -155,7 +165,7 @@ void ut_produce_next_block(uint32_t over) { EXPECT_GT(over, 0); EXPECT_LT(over, bs); - q = bbq_create_elem_with_bnbs("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); + q = bbq_create_elem_with_bnbs("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW, NULL, NULL); ASSERT_NE(q, nullptr); // 生产至第二块的第一个entry @@ -164,7 +174,7 @@ void ut_produce_next_block(uint32_t over) { EXPECT_TRUE(ret == BBQ_OK); } - EXPECT_EQ(q->chead, 0); + EXPECT_EQ(bbq_atomic64_load(&q->chead), 0); expect_phead(q, 1, 0, __LINE__); expect_eq_allocated(q, &q->blocks[0], bs, 0, __LINE__); expect_eq_committed(q, &q->blocks[0], bs, 0, __LINE__); @@ -216,7 +226,7 @@ void ut_produce_all_loop(uint32_t loop) { int enqueue_data = TEST_DATA_MAGIC; int dequeue_data = 0; - q = bbq_create_elem_with_bnbs("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); + q = bbq_create_elem_with_bnbs("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW, NULL, NULL); ASSERT_NE(q, nullptr); for (uint32_t cnt = 0; cnt < loop; cnt++) { @@ -266,12 +276,13 @@ TEST_F(bbq_head_cursor, retry_new_full_empty) { uint32_t entries_cnt = 4096; uint32_t loop = 1000; struct bbq *q; - + uint64_t ph = 0; + uint64_t ch = 0; int *data = (int *)test_malloc(TEST_MODULE_UTEST, sizeof(*data) * entries_cnt); int tmp_data = 0; EXPECT_TRUE(data); - q = bbq_create_elem("test_bbq", entries_cnt, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); + q = bbq_create_elem("test_bbq", entries_cnt, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW, NULL, NULL); ASSERT_NE(q, nullptr); EXPECT_TRUE(bbq_empty(q)); @@ -290,14 +301,17 @@ TEST_F(bbq_head_cursor, retry_new_full_empty) { EXPECT_TRUE(ret == BBQ_ERR_FULL); } + ph = bbq_atomic64_load(&q->phead); + ch = bbq_atomic64_load(&q->chead); if (i == 0) { - EXPECT_EQ((q->phead.load() + 1) & q->idx_mask, q->chead.load() & q->idx_mask); + EXPECT_EQ((ph + 1) & q->idx_mask, ch & q->idx_mask); } else { - EXPECT_EQ((q->phead.load()) & q->idx_mask, q->chead.load() & q->idx_mask); + EXPECT_EQ((ph)&q->idx_mask, ch & q->idx_mask); } + for (uint32_t i = 0; i < q->bn; i++) { - EXPECT_EQ(q->blocks[i].committed.load() & q->off_mask, q->bs); - EXPECT_GE(q->blocks[i].allocated.load() & q->off_mask, q->bs); + EXPECT_EQ(bbq_atomic64_load(&q->blocks[i].committed) & q->off_mask, q->bs); + EXPECT_GE(bbq_atomic64_load(&q->blocks[i].allocated) & q->off_mask, q->bs); } // 全出队 @@ -315,12 +329,14 @@ TEST_F(bbq_head_cursor, retry_new_full_empty) { EXPECT_TRUE(ret == BBQ_ERR_EMPTY); } - EXPECT_EQ(q->phead.load() & q->idx_mask, q->chead.load() & q->idx_mask); + ph = bbq_atomic64_load(&q->phead); + ch = bbq_atomic64_load(&q->chead); + EXPECT_EQ(ph & q->idx_mask, ch & q->idx_mask); for (uint32_t i = 0; i < q->bn; i++) { - EXPECT_EQ(q->blocks[i].committed.load() & q->off_mask, q->bs); - EXPECT_GE(q->blocks[i].allocated.load() & q->off_mask, q->bs); - EXPECT_EQ(q->blocks[i].consumed.load() & q->off_mask, q->bs); - EXPECT_GE(q->blocks[i].reserved.load() & q->off_mask, q->bs); + EXPECT_EQ(bbq_atomic64_load(&q->blocks[i].committed) & q->off_mask, q->bs); + EXPECT_GE(bbq_atomic64_load(&q->blocks[i].allocated) & q->off_mask, q->bs); + EXPECT_EQ(bbq_atomic64_load(&q->blocks[i].consumed) & q->off_mask, q->bs); + EXPECT_GE(bbq_atomic64_load(&q->blocks[i].reserved) & q->off_mask, q->bs); } } @@ -342,13 +358,13 @@ TEST_F(bbq_head_cursor, mpsc_faa) { .producer_cnt = 10, .consumer_cnt = 1, .workload = TEST_WORKLOAD_SIMPLE, - .entries_cnt = 1, + .entries_cnt = 4, .block_count = 1, .burst_cnt = 1, }, .run = { - .run_ok_times = 0, - .run_time = 5, + .run_ok_times = 9000000, + .run_time = 0, }, }, .ctl = { @@ -401,7 +417,7 @@ TEST_F(bbq_head_cursor, drop_old_full_empty) { struct bbq *q; int tmp_data = 0; - q = bbq_create_elem_with_bnbs("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD); + q = bbq_create_elem_with_bnbs("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD, NULL, NULL); ASSERT_NE(q, nullptr); EXPECT_TRUE(bbq_empty(q)); @@ -434,7 +450,7 @@ TEST_F(bbq_head_cursor, drop_old_full_empty) { expect_eq_committed(q, &q->blocks[i], q->bs, i == 0 ? j : j + 1, __LINE__); expect_eq_allocated(q, &q->blocks[i], q->bs, i == 0 ? j : j + 1, __LINE__); expect_eq_reserved(q, &q->blocks[i], q->bs, i == 0 ? j : j + 1, __LINE__); - EXPECT_EQ(q->blocks[i].consumed.load(), 0); + EXPECT_EQ(bbq_atomic64_load(&q->blocks[i].consumed), 0); } } EXPECT_TRUE(bbq_debug_check_array_bounds(q)); @@ -452,7 +468,7 @@ TEST_F(bbq_head_cursor, drop_old_full_empty_cover) { EXPECT_EQ(over_cnt / bs, 1); int tmp_data = 0; - q = bbq_create_elem_with_bnbs("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD); + q = bbq_create_elem_with_bnbs("test_bbq", bn, bs, sizeof(int), BBQ_SOCKET_ID_ANY, BBQ_F_DROP_OLD, NULL, NULL); ASSERT_NE(q, nullptr); EXPECT_TRUE(bbq_empty(q)); @@ -486,7 +502,7 @@ TEST_F(bbq_head_cursor, drop_old_full_empty_cover) { expect_eq_reserved(q, &q->blocks[i], i == 0 ? 0 : bs, 0, __LINE__); - EXPECT_EQ(q->blocks[i].consumed.load(), 0); + EXPECT_EQ(bbq_atomic64_load(&q->blocks[i].consumed), 0); } // 队列中的数据全出队 @@ -514,7 +530,7 @@ TEST_F(bbq_head_cursor, drop_old_full_empty_cover) { expect_eq_reserved(q, &q->blocks[i], i == bn - 1 ? over_cnt - bs : bs, i == 1 ? loop + 1 : 0, __LINE__); - EXPECT_EQ(q->blocks[i].consumed.load(), 0); + EXPECT_EQ(bbq_atomic64_load(&q->blocks[i].consumed), 0); } EXPECT_TRUE(bbq_debug_check_array_bounds(q)); diff --git a/bbq/tests/unittest/ut_mix.cc b/bbq/tests/unittest/ut_mix.cc index fa2f07f..cd545bf 100644 --- a/bbq/tests/unittest/ut_mix.cc +++ b/bbq/tests/unittest/ut_mix.cc @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-26 03:37:56 + * @LastEditTime: 2024-06-27 07:25:43 * @Email: [email protected] * @Describe: bbq除了队列操作外,其他函数的测试 */ @@ -11,10 +11,12 @@ extern "C" { #include <math.h> extern bool bbq_check_power_of_two(int n); extern unsigned bbq_ceil_log2(uint64_t x); -extern uint64_t bbq_fetch_max(aotmic_uint64 *atom, uint64_t upd); +extern uint64_t bbq_fetch_max(struct bbq_atomic64 *atomic, uint64_t upd); extern bool bbq_malloc_free_equal(); extern bool test_malloc_free_equal(); extern int bbq_bnbs_calc(uint32_t entries, uint32_t *bn, uint32_t *bs); +extern void bbq_atomic64_store(struct bbq_atomic64 *atomic, uint64_t value); +extern uint64_t bbq_atomic64_load(struct bbq_atomic64 *atomic); } class bbq_mix : public testing::Test { // 继承了 testing::Test @@ -35,7 +37,7 @@ class bbq_mix : public testing::Test { // 继承了 testing::Test typedef struct { uint64_t thread_cnt; - aotmic_uint64 data; + bbq_atomic64 data; aotmic_uint64 ready_thread_cnt; } ut_fetch_arg; @@ -54,12 +56,18 @@ void *fetch_max_thread_func(void *arg) { TEST_F(bbq_mix, bbq_fetch_max) { uint64_t ret = 0; - ut_fetch_arg arg = {}; - arg.data.store(1); // 初始化1 + ut_fetch_arg arg; + + arg.data.single = false; + arg.data.m.store(0); + arg.thread_cnt = 0; + arg.ready_thread_cnt.store(0); + + bbq_atomic64_store(&arg.data, 1); // 初始化1 arg.thread_cnt = 50; ret = bbq_fetch_max(&arg.data, 2); // max比较后设置为2 - EXPECT_EQ(arg.data.load(), 2); + EXPECT_EQ(bbq_atomic64_load(&arg.data), 2); EXPECT_EQ(ret, 1); pthread_t *threads = (pthread_t *)test_malloc(TEST_MODULE_UTEST, sizeof(*threads) * arg.thread_cnt); diff --git a/bbq/tests/unittest/ut_multit.cc b/bbq/tests/unittest/ut_multit.cc index bbe963b..31ea246 100644 --- a/bbq/tests/unittest/ut_multit.cc +++ b/bbq/tests/unittest/ut_multit.cc @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-25 11:30:59 + * @LastEditTime: 2024-06-30 21:56:17 * @Email: [email protected] * @Describe: TODO */ @@ -49,7 +49,7 @@ TEST_F(multit, mpmc) { .burst_cnt = 4, }, .run = { - .run_ok_times = 50000, + .run_ok_times = 9000000, .run_time = 0, }, }, diff --git a/perf/CMakeLists.txt b/perf/CMakeLists.txt index 40e8326..803ebbe 100644 --- a/perf/CMakeLists.txt +++ b/perf/CMakeLists.txt @@ -8,6 +8,7 @@ include_directories( ${CMAKE_CURRENT_SOURCE_DIR}/thirdparty/iniparser ${CMAKE_CURRENT_SOURCE_DIR}/thirdparty/rmind_ringbuf /root/code/c/dpdk-21.11.4/install/include + #/home/liuyu/code/marsio/build/support/dpdk/include ) # 将bbq单元测试里的公共文件,添加到perf里。 @@ -24,6 +25,7 @@ if(NOT CMAKE_BUILD_TYPE) set(CMAKE_BUILD_TYPE Release CACHE STRING "Choose the type of build, options are: Debug Release RelWithDebInfo MinSizeRel." FORCE) endif() +add_definitions(-D_GNU_SOURCE) add_compile_options(-Wall -Wextra) # 库生成的路径 @@ -35,6 +37,7 @@ set(EXEC_PATH ${OUTPUT_DIR}/bin) link_directories(${LIB_PATH}) link_directories(../bbq/build/output/lib/) link_directories(/root/code/c/dpdk-21.11.4/install/lib64 /root/code/c/dpdk-21.11.4/install/lib64/dpdk/pmds-22.0) +# link_directories(/home/liuyu/code/marsio/build/support/dpdk/lib64 /home/liuyu/code/marsio/build/support/dpdk/lib64/dpdk/pmds-22.0) # 可执行程序的名字 set(BENCHMARK_NAME benchmark) diff --git a/perf/benchmark/bcm_benchmark.c b/perf/benchmark/bcm_benchmark.c index 801cd3a..5407d2b 100644 --- a/perf/benchmark/bcm_benchmark.c +++ b/perf/benchmark/bcm_benchmark.c @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-18 18:20:02 + * @LastEditTime: 2024-06-27 07:11:39 * @Email: [email protected] * @Describe: TODO */ @@ -101,8 +101,8 @@ int main(int argc, char *argv[]) { burst_cnt = 1; } } else { - config = "/root/code/c/bbq-ly/perf/benchmark/config/compare/case1_simple_spsc.ini"; - ring_type = "bbq"; + config = "/root/code/c/bbq/perf/benchmark/config/compare/case1_simple_spsc.ini"; + ring_type = "dpdk"; burst_cnt = 16; TEST_ERR_LOG("use default config, ringt_type:%s burst:%u config:%s argc:%d", ring_type, burst_cnt, config, argc); } diff --git a/perf/benchmark/bcm_queue.c b/perf/benchmark/bcm_queue.c index 5cbcea7..462afcf 100644 --- a/perf/benchmark/bcm_queue.c +++ b/perf/benchmark/bcm_queue.c @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-25 14:14:40 + * @LastEditTime: 2024-06-27 22:15:13 * @Email: [email protected] * @Describe: TODO */ @@ -10,20 +10,24 @@ static __rte_always_inline unsigned int bcm_dpdk_ring_enqueue_burst(struct rte_ring *r, void **obj_table, uint32_t n, uint16_t thread_idx, uint32_t *wait_consumed) { TEST_AVOID_WARNING(thread_idx); - unsigned int free_space = 0; int ret = 0; - ret = rte_ring_enqueue_burst(r, (void *const *)obj_table, n, &free_space); - *wait_consumed = r->size - free_space - 1; + if (wait_consumed) { + unsigned int free_space = 0; + ret = rte_ring_enqueue_burst(r, (void *const *)obj_table, n, &free_space); + *wait_consumed = r->size - free_space - 1; + } else { + ret = rte_ring_enqueue_burst(r, (void *const *)obj_table, n, NULL); + } + return ret; } int test_queue_init_dpdk(test_cfg *cfg, test_queue_s *q) { - char *argv[1] = {"bcm_dpdk"}; - - // eal环境初始化,如接口、内存等 - if (rte_eal_init(1, argv) < 0) { - rte_exit(EXIT_FAILURE, "Error with EAL init\n"); + /* generate eal parameters */ + const char *eal_args[] = {"bcm_dpdk", "-n", "4", "--proc-type", "auto", "--no-huge", "-m", "2048"}; + if (rte_eal_init(RTE_DIM(eal_args), (char **)eal_args) < 0) { + return -1; } q->ring_type = TEST_RING_TYPE_DPDK; diff --git a/perf/benchmark/benchmark.sh b/perf/benchmark/benchmark.sh index 6889de3..abde645 100755 --- a/perf/benchmark/benchmark.sh +++ b/perf/benchmark/benchmark.sh @@ -1,7 +1,7 @@ #!/bin/bash ### # @Author: liuyu -# @LastEditTime: 2024-06-14 14:37:48 + # @LastEditTime: 2024-06-30 21:43:01 # @Email: [email protected] # @Describe: 运行性能测试的脚本 ### @@ -29,8 +29,8 @@ function exec_benchmark_ring_type() { # 如果以perf开头的配置文件,还要执行perf统计 if [[ $(basename "$ini") == perf* ]]; then echo "skip perf*" - # echo perf stat -a -e L1-dcache-loads,L1-dcache-load-misses,cache-references,cache-misses "$BENCHMARK_PATH" "$ini" "$ring" "$BURST_CNT" - # perf stat -a -e L1-dcache-loads,L1-dcache-load-misses,cache-references,cache-misses "$BENCHMARK_PATH" "$ini" "$ring" "$BURST_CNT" + # echo perf stat -e L1-dcache-loads,L1-dcache-load-misses "$BENCHMARK_PATH" "$ini" "$ring" "$BURST_CNT" + # perf stat -e L1-dcache-loads,L1-dcache-load-misses "$BENCHMARK_PATH" "$ini" "$ring" "$BURST_CNT" else "$BENCHMARK_PATH" "$ini" "$ring" "$burst" 2>&1 | tee -a "$log_file" fi |
