From 39e368c5ae0f2dc3bd33ab03c990f866468b0a32 Mon Sep 17 00:00:00 2001 From: liuyu Date: Thu, 20 Jun 2024 21:50:42 +0800 Subject: 替换bbq MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- infra/CMakeLists.txt | 2 +- infra/include/bbq.h | 262 ++++++++++ infra/include/vnode.h | 78 ++- infra/src/bbq.c | 1200 ++++++++++++++++++++++++++++++++++++++++++++++ infra/src/vnode_common.c | 27 +- infra/src/vnode_common.h | 4 + infra/src/vnode_mirror.c | 26 +- infra/test/TestVNode.cc | 109 ++++- 8 files changed, 1651 insertions(+), 57 deletions(-) create mode 100644 infra/include/bbq.h create mode 100644 infra/src/bbq.c diff --git a/infra/CMakeLists.txt b/infra/CMakeLists.txt index d03ee25..8af6e68 100644 --- a/infra/CMakeLists.txt +++ b/infra/CMakeLists.txt @@ -8,7 +8,7 @@ include_directories(${CMAKE_BINARY_DIR}/include/libevent/include) include_directories(include) include_directories(../service/include) -add_library(infra src/cJSON.c src/vnode_common.c src/vnode_mirror.c src/pkt_classifier_engine.c src/port_adapter_mapping.c src/link_db.c src/ldbc.c src/version.c src/rpc.c src/dp_trace.c) +add_library(infra src/cJSON.c src/vnode_common.c src/vnode_mirror.c src/pkt_classifier_engine.c src/port_adapter_mapping.c src/link_db.c src/ldbc.c src/version.c src/rpc.c src/dp_trace.c src/bbq.c) target_link_libraries(infra rt libevent-static libevent-static-pthreads pthread dl MESA_prof_load_static libdpdk) target_include_directories(infra INTERFACE "${CMAKE_CURRENT_SOURCE_DIR}/include/") diff --git a/infra/include/bbq.h b/infra/include/bbq.h new file mode 100644 index 0000000..6e4d84e --- /dev/null +++ b/infra/include/bbq.h @@ -0,0 +1,262 @@ +/* + * @Author: liuyu@geedgenetworks.com + * @LastEditTime: 2024-06-20 18:03:59 + * @Describe: bbq(Block-based Bounded Queue)头文件 + * 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf + */ + +#pragma once + +#include +#include +#include + +#ifndef __cplusplus +// C +#include +typedef atomic_uint_fast64_t aotmic_uint64; +typedef aotmic_uint64 bbq_cursor; +typedef aotmic_uint64 bbq_head; +#else +// C++ 为了兼容gtest测试 +using bbq_cursor = std::atomic; +using bbq_head = std::atomic; +using aotmic_uint64 = std::atomic; +#endif + +#define BBQ_SOCKET_ID_ANY -1 +#define __BBQ_CACHE_ALIGNED __attribute__((__aligned__(64))) + +struct bbq_block +{ + bbq_cursor committed; // 已提交(version|offset) + bbq_cursor allocated; // 已分配(version|offset) + bbq_cursor reserved; // 已预留(version|offset) + bbq_cursor consumed; // 已消费(version|offset)注:在drop-old模式下没用到 + char * entries; // 存储大小可变的entry,分配空间大小:bs * entry_size +} __BBQ_CACHE_ALIGNED; + +struct bbq +{ + size_t bn; // blocks的个数 + size_t bs; // blocks.entries的个数 + size_t entry_size; // blocks.entries里每个entry的大小 + + int32_t socket_id; // 用于libnuma分配内存,socket_id小于0将使用malloc分配 + uint32_t flags; // 标记:retry new 模式,还是drop old模式 + uint32_t idx_bits; // bbq_head里idx所占的位数 + uint32_t off_bits; // bbq_cursor里offset所占的位数 + + uint64_t idx_mask; // idx_bits偏移后的掩码 + uint64_t off_mask; // off_bits偏移后的掩码 + bbq_head phead; // 生产者头,指向块的索引,分为两部分:version|idx + bbq_head chead; // 消费者头,指向块的索引,分为两部分:version|idx + + struct bbq_block * blocks; // bn大小的数组 +}; + +#define BBQ_F_DROP_OLD 0x0002 /**< 创建队列时设置为drop old模式(队列满时,入队成功并覆盖旧数据) */ +// #define BBQ_F_SP_ENQ 0x0004 /**< 创建队列时设置为单生产者 */ +// #define BBQ_F_SC_DEQ 0x0008 /**< 创建队列时设置为单消费者 */ + +#define BBQ_F_DEFAULT 0x0 +#define BBQ_F_RETRY_NEW BBQ_F_DEFAULT /**< 创建队列时设置为retry new模式(队列满时,当前入队失败) */ +#define BBQ_F_MP_ENQ BBQ_F_DEFAULT /**< 创建队列时设置为多生产者 */ +#define BBQ_F_MC_DEQ BBQ_F_DEFAULT /**< 创建队列时设置为多消费者 */ +/** + * 创建bbq队列,使用当前函数创建的队列,后续操作会把指针入队。 + * 对应入队函数:bbq_enqueue、bbq_enqueue_burst + * 对应出队函数:bbq_dequeue、bbq_dequeue_burst + * + * @param[in] count + * 队列所有entry的个数,count必须大于1,且是2的N次方。 + * @param[in] socket_id + * 多numa架构下,调用libnuma库函数针对指定socket分配内存。 + * 当检测到不支持多numa,将转为malloc分配内存。 + * @param[in] flags + * 设置入队策略: + * 1)BBQ_F_RETRY_NEW(默认):队列满了当前入队失败。 + * 2)BBQ_F_DROP_OLD:队列满时,覆盖旧数据,入队成功 + * @return + * 非NULL:消息队列结构体指针,用于后续出队入队等操作。 + * NULL:创建失败。 + */ +extern struct bbq * bbq_create(uint32_t count, int socket_id, uint32_t flags); + +/** + * 消息队列单个指针入队 + * + * @param[in] q + * 队列指针 + * @param[in] data + * 则传入一维指针,如: + * int *data = malloc(sizeof(int));*data = 1; 传入&data + * @return + * 成功返回0,失败返回小于0的错误码。 + */ +extern int bbq_enqueue(struct bbq * q, void * const * data); + +/** + * 消息队列单个指针出队 + * + * @param[in] q + * 队列指针 + * @param[out] data + * 则传入二维指针,如: + * int *data = NULL; 传入&data + * @return + * 成功返回0,失败返回小于0的错误码。 + */ +extern int bbq_dequeue(struct bbq * q, void ** data); + +/** + * 消息队列批量指针入队,尽可能一次入队n个指针,返回实际成功入队个数 + * + * @param[in] q + * 队列指针 + * @param[in] obj_table + * 即将入队的指针数组,如: + * uint16_t **obj_table = malloc(sizeof(uint16_t **) * BUF_CNT); + * for(int i=0;i #include +#include -#define __DECLARE_COMMON_VNODE_CREATE_PROD(_type) \ -struct vnode_prod * vnode_##_type##_create_prod(struct vnode * vnode, const char * symbol, int nr_prodq); +// #define BBQ_SPSC -#define __DECLARE_COMMON_VNODE_CREATE_CONS(_type) \ -struct vnode_cons * vnode_##_type##_create_cons(struct vnode * vnode, const char * symbol, int nr_prodq); +#define __DECLARE_COMMON_VNODE_CREATE_PROD(_type) \ + struct vnode_prod * vnode_##_type##_create_prod(struct vnode * vnode, const char * symbol, int nr_prodq); -#define __DECLARE_COMMON_VNODE_PROD_LOOKUP(_type) \ -struct vnode_prod * vnode_##_type##_prod_lookup(struct vnode * vnode, const char * sym); +#define __DECLARE_COMMON_VNODE_CREATE_CONS(_type) \ + struct vnode_cons * vnode_##_type##_create_cons(struct vnode * vnode, const char * symbol, int nr_prodq); -#define __DECLARE_COMMON_VNODE_CONS_LOOKUP(_type) \ -struct vnode_cons * vnode_##_type##_cons_lookup(struct vnode * vnode, const char * sym); +#define __DECLARE_COMMON_VNODE_PROD_LOOKUP(_type) \ + struct vnode_prod * vnode_##_type##_prod_lookup(struct vnode * vnode, const char * sym); -#define __DECLARE_COMMON_VNODE_PROD_ATTACH(_type) \ -int vnode_##_type##_prod_attach(struct vnode * node, struct vnode_prod* prod); +#define __DECLARE_COMMON_VNODE_CONS_LOOKUP(_type) \ + struct vnode_cons * vnode_##_type##_cons_lookup(struct vnode * vnode, const char * sym); -#define __DECLARE_COMMON_VNODE_CONS_ATTACH(_type) \ -int vnode_##_type##_cons_attach(struct vnode * node, struct vnode_cons * cons); +#define __DECLARE_COMMON_VNODE_PROD_ATTACH(_type) \ + int vnode_##_type##_prod_attach(struct vnode * node, struct vnode_prod * prod); -#define __DECLARE_COMMON_VNODE_PROD_STAT_GET(_type) \ -struct vnode_prod_stat * vnode_##_type##_prod_stat_get(struct vnode_prod * prod); +#define __DECLARE_COMMON_VNODE_CONS_ATTACH(_type) \ + int vnode_##_type##_cons_attach(struct vnode * node, struct vnode_cons * cons); -#define __DECLARE_COMMON_VNODE_CONS_STAT_GET(_type) \ -struct vnode_cons_stat * vnode_##_type##_cons_stat_get(struct vnode_cons * cons); +#define __DECLARE_COMMON_VNODE_PROD_STAT_GET(_type) \ + struct vnode_prod_stat * vnode_##_type##_prod_stat_get(struct vnode_prod * prod); -#define __DECLARE_COMMON_VNODE_DELETE_PROD(_type) \ -int vnode_##_type##_delete_prod(struct vnode_prod * prod); +#define __DECLARE_COMMON_VNODE_CONS_STAT_GET(_type) \ + struct vnode_cons_stat * vnode_##_type##_cons_stat_get(struct vnode_cons * cons); -#define __DECLARE_COMMON_VNODE_DELETE_CONS(_type) \ -int vnode_##_type##_delete_cons(struct vnode_cons * cons); +#define __DECLARE_COMMON_VNODE_DELETE_PROD(_type) int vnode_##_type##_delete_prod(struct vnode_prod * prod); -#define __DECLARE_COMMON_VNODE_UNPOISON_PROD(_type) \ -void vnode_##_type##_unpoison_prod(struct vnode_prod * prod); +#define __DECLARE_COMMON_VNODE_DELETE_CONS(_type) int vnode_##_type##_delete_cons(struct vnode_cons * cons); -#define __DECLARE_COMMON_VNODE_UNPOISON_CONS(_type) \ -void vnode_##_type##_unpoison_cons(struct vnode_cons * cons); +#define __DECLARE_COMMON_VNODE_UNPOISON_PROD(_type) void vnode_##_type##_unpoison_prod(struct vnode_prod * prod); -#define __DECLARE_COMMON_VNODE_NOTIFY_CTX_CONS(_type) \ -struct vnode_cons_notify * vnode_##_type##_notify_ctx_cons(struct vnode_cons * cons); +#define __DECLARE_COMMON_VNODE_UNPOISON_CONS(_type) void vnode_##_type##_unpoison_cons(struct vnode_cons * cons); +#define __DECLARE_COMMON_VNODE_NOTIFY_CTX_CONS(_type) \ + struct vnode_cons_notify * vnode_##_type##_notify_ctx_cons(struct vnode_cons * cons); struct vnode_cons_stat { - volatile uint64_t on_line; - volatile uint64_t deliver; - volatile uint64_t missed; - volatile uint64_t total_len; + volatile uint64_t on_line; + volatile uint64_t deliver; + volatile uint64_t missed; + volatile uint64_t total_len; volatile unsigned int q_len_max; volatile float q_len_avg_max; @@ -64,10 +62,10 @@ struct vnode_cons_stat struct vnode_prod_stat { - volatile uint64_t on_line; - volatile uint64_t deliver; - volatile uint64_t missed; - volatile uint64_t total_len; + volatile uint64_t on_line; + volatile uint64_t deliver; + volatile uint64_t missed; + volatile uint64_t total_len; volatile int64_t ring_elem_count_avg; volatile int64_t ring_shared_credict_avg; @@ -97,11 +95,11 @@ struct vnode_cons_notify volatile int cons_running_status; }; -#define VNODE_STAT_READ(_addr) *(_addr) +#define VNODE_STAT_READ(_addr) *(_addr) enum _vnode_type { - VNODE_TYPE_MIRROR, + VNODE_TYPE_MIRROR, }; struct vnode; @@ -111,7 +109,7 @@ struct vnode_cons; int vnode_mirror_enqueue_bulk(struct vnode_prod * prod, unsigned int prodq, struct rte_mbuf * objects[], uint32_t hash[], unsigned int nr_objects); -int vnode_mirror_dequeue_burst(struct vnode_cons * cons, unsigned int consq, struct rte_mbuf * objects[], +int vnode_mirror_dequeue_burst(struct vnode_cons * cons, unsigned int consq, struct rte_mbuf * objects[], int nr_max_objects); int vnode_mirror_rt_object_retrieve(struct vnode_prod * prod, unsigned int prodq, struct rte_mbuf * objects[], diff --git a/infra/src/bbq.c b/infra/src/bbq.c new file mode 100644 index 0000000..992b8f7 --- /dev/null +++ b/infra/src/bbq.c @@ -0,0 +1,1200 @@ +/* + * @Author: liuyu + * @LastEditTime: 2024-06-20 18:58:05 + * @Email: liuyu@geedgenetworks.com + * @Describe: bbq(Block-based Bounded Queue)实现 + * 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf + */ +#include "bbq.h" +#include +#include +#include +#include + +// flags第1位控制入队时的数据拷贝策略,默认是"拷贝指针" +#define BBQ_F_COPY_PTR 0x0 /**< 默认为拷贝指针 */ +#define BBQ_F_COPY_VALUE 0x0001 /**< 创建队列时设置为拷贝数值 */ + +// 判断flags标记位 +#define BBQ_F_CHK_DROP_OLD(flags) (flags & BBQ_F_DROP_OLD) +#define BBQ_F_CHK_VALUE(flags) (flags & BBQ_F_COPY_VALUE) + +// 避免无用参数的编译告警 +#define AVOID_WARNING(param) ((void)param) + +#define BBQ_ERR_LOG(fmt, ...) \ + do \ + { \ + printf("\x1b[31m [ERR][%s:%d:%s]" fmt "\x1b[0m\n", __func__, __LINE__, __FILE__, ##__VA_ARGS__); \ + } while (0) + +struct bbq_status +{ + int32_t status; // 返回状态 + uint32_t actual_burst; // 实际出/入队个数 +}; + +enum bbq_queue_state +{ + BBQ_SUCCESS = 0, + BBQ_BLOCK_DONE, // 当前块的entry已用完,需要移动到下一个块 + BBQ_NO_ENTRY, // 队列里没有entry可以使用了 + BBQ_NOT_AVAILABLE, // 当前块不可以用状态(将返回busy) + BBQ_ALLOCATED, // 已分配,返回entry信息 + BBQ_RESERVED, // 已保留,返回entry信息 +}; + +struct bbq_entry_desc +{ + uint64_t vsn; // allocated游标的版本(vsn) TODO:修正注释 + uint64_t off; // entry在当前block的偏移(offset) + uint32_t actual_burst; // 实际出/入队个数 + struct bbq_block * block; // 指向所在的block +}; + +struct bbq_queue_state_s +{ + enum bbq_queue_state state; // 队列状态 + union { // TODO: + uint64_t vsn; // reserve_entry state==BLOCK_DONE时生效 + struct bbq_entry_desc e; // state为ALLOCATED、RESERVED生效 + }; +}; + +extern inline uint64_t bbq_idx(struct bbq * q, uint64_t x) +{ + return x & q->idx_mask; +} + +extern inline uint64_t bbq_off(struct bbq * q, uint64_t x) +{ + return x & q->off_mask; +} + +extern inline uint64_t bbq_head_vsn(struct bbq * q, uint64_t x) +{ + return x >> q->idx_bits; +} + +extern inline uint64_t bbq_cur_vsn(struct bbq * q, uint64_t x) +{ + return x >> q->off_bits; +} + +extern inline uint64_t set_cur_vsn(struct bbq * q, uint64_t ver) +{ + return ver << q->off_bits; +} + +// 当BBQ_MEMORY宏定义开关打开,将对内存分配释放进行统计,方便排查内存泄漏 +// #define BBQ_MEMORY +enum bbq_module +{ + BBQ_MODULE_QUEUE = 0, + BBQ_MODULE_QUEUE_BLOCK_NB, + BBQ_MODULE_QUEUE_BLOCK_ENTRY, + BBQ_MODULE_MAX, +}; + +#ifdef BBQ_MEMORY +#define BBQ_MEM_MAGIC 0xFF +struct bbq_memory_s +{ + aotmic_uint64 malloc_cnt; + aotmic_uint64 malloc_size; + aotmic_uint64 free_cnt; + aotmic_uint64 free_size; +}; +struct bbq_memory_s bbq_memory_g[BBQ_MODULE_MAX] = {0}; +#endif + +static void * bbq_malloc(enum bbq_module module, int socket_id, size_t size) +{ + void * ptr = NULL; + if (socket_id >= 0) + { + ptr = numa_alloc_onnode(size, 0); + } + else + { + ptr = malloc(size); + } +#ifdef BBQ_MEMORY + if (ptr != NULL) + { + atomic_fetch_add(&bbq_memory_g[module].malloc_cnt, 1); + atomic_fetch_add(&bbq_memory_g[module].malloc_size, size); + } +#else + AVOID_WARNING(module); +#endif + return ptr; +} + +static void bbq_free(enum bbq_module module, int socket_id, void * ptr, size_t size) +{ + if (socket_id >= 0) + { + numa_free(ptr, size); + } + else + { + free(ptr); + } + +#ifdef BBQ_MEMORY + if (ptr != NULL) + { + atomic_fetch_add(&bbq_memory_g[module].free_cnt, 1); + atomic_fetch_add(&bbq_memory_g[module].free_size, size); + } +#else + AVOID_WARNING(module); +#endif +} + +/* 原子的比较两个值大小,并设置较大的值,成功则返回设置前的旧值 */ +uint64_t fetch_max(aotmic_uint64 * atom, uint64_t upd) +{ + uint64_t old_value = 0; + do + { + old_value = atomic_load(atom); + } while (old_value < upd && !atomic_compare_exchange_weak(atom, &old_value, upd)); + + return old_value; +} + +/* 检查参数是否为2的N次幂 */ +bool bbq_check_power_of_two(uint32_t n) +{ + if (n <= 0) + { + return false; + } + + return (n & (n - 1)) == 0; +} + +/* 根据entries大小返回合理的block个数 + * 计算公式:log2(blocks) = max(1, ⌊log2(count)/4⌋) 注:⌊ ⌋代表向下取整。*/ +uint32_t bbq_blocks_calc(uint32_t entries) +{ + double log_entries = log2((double)entries); + uint32_t over4 = (uint32_t)(log_entries / 4); // 向下取整 + uint32_t max_value = (over4 > 1) ? over4 : 1; + uint32_t n = pow(2, max_value); + return n; +} + +/* 块初始化 */ +int block_init(struct bbq * q, struct bbq_block * block, bool cursor_init) +{ +#ifdef BBQ_MEMORY + // 末尾多分配一个entry(永远不应该被修改),以此检查是否存在写越界的问题 + block->entries = bbq_malloc(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id, (q->bs + 1) * q->entry_size); + char * last_entry = block->entries + q->entry_size * q->bs; + memset(last_entry, BBQ_MEM_MAGIC, q->entry_size); +#else + block->entries = bbq_malloc(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id, q->bs * q->entry_size); +#endif + + if (block->entries == NULL) + { + BBQ_ERR_LOG("bbq_malloc error"); + return BBQ_ALLOC_ERR; + } + + block->committed = ATOMIC_VAR_INIT(0); + block->allocated = ATOMIC_VAR_INIT(0); + block->reserved = ATOMIC_VAR_INIT(0); + block->consumed = ATOMIC_VAR_INIT(0); + + if (cursor_init) + { + // block数组里,除了第一块之外需要设置 + block->committed = ATOMIC_VAR_INIT(q->bs); + block->allocated = ATOMIC_VAR_INIT(q->bs); + block->reserved = ATOMIC_VAR_INIT(q->bs); + if (BBQ_F_CHK_DROP_OLD(q->flags)) + { + block->consumed = ATOMIC_VAR_INIT(0); + } + else + { + block->consumed = ATOMIC_VAR_INIT(q->bs); + } + } + + return BBQ_OK; +} + +/* 块清理函数,与block_init成对*/ +void block_destory(struct bbq * q, struct bbq_block * block) +{ + if (block->entries) + { +#ifdef BBQ_MEMORY + bbq_free(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id, block->entries, + sizeof(*block->entries) * (q->bs + 1) * q->entry_size); +#else + bbq_free(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id, block->entries, + sizeof(*block->entries) * q->bs * q->entry_size); +#endif + block->entries = NULL; + } +} + +/* +求x在二进制表示中最高位1所在的位置,x参数不能为0。 +例如:x=1,return 0 (...1) +x=3,return 1 (..11) +x=9,return 3 (1..1) +*/ +unsigned floor_log2(uint64_t x) +{ + return x == 1 ? 0 : 1 + floor_log2(x >> 1); +} + +/* +返回以2为底x的对数,并向上取整值。 +例如:x=1,return 0 (2^0=1) +x=99, return 7(2^6=64 2^7=128) +*/ +unsigned ceil_log2(uint64_t x) +{ + return x == 1 ? 0 : floor_log2(x - 1) + 1; +} + +/* 创建消息队列,bn和bs必须是2的N次幂,socket_id用于多numa分配内存 */ +static struct bbq * __bbq_create_bnbs(uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags) +{ + int ret = 0; + + if (bbq_check_power_of_two(bn) == false) + { + BBQ_ERR_LOG("block number is not power of two, now is :%u", bn); + return NULL; + } + + if (bbq_check_power_of_two(bs) == false) + { + BBQ_ERR_LOG("block size is not power of two, now is :%u", bs); + return NULL; + } + + if (obj_size == 0) + { + BBQ_ERR_LOG("obj_size is 0"); + return NULL; + } + + if (numa_available() < 0) + { + // 不支持numa,设置 + socket_id = BBQ_SOCKET_ID_ANY; + } + + struct bbq * q = bbq_malloc(BBQ_MODULE_QUEUE, socket_id, sizeof(*q)); + if (q == NULL) + { + BBQ_ERR_LOG("malloc for bbq queue error"); + return NULL; + } + memset(q, 0, sizeof(*q)); + + q->bn = bn; + q->bs = bs; + q->entry_size = obj_size; + q->socket_id = socket_id; + q->phead = ATOMIC_VAR_INIT(0); + q->chead = ATOMIC_VAR_INIT(0); + q->flags = flags; + + q->blocks = bbq_malloc(BBQ_MODULE_QUEUE_BLOCK_NB, socket_id, bn * sizeof(*q->blocks)); + if (q->blocks == NULL) + { + BBQ_ERR_LOG("bbq malloc for blocks error"); + goto error; + } + memset(q->blocks, 0, sizeof(*q->blocks)); + + for (uint32_t i = 0; i < bn; ++i) + { + // 第一个block不需要设置cursor_init + bool cursor_init = (i == 0 ? false : true); + ret = block_init(q, &(q->blocks[i]), cursor_init); + if (ret != BBQ_OK) + { + BBQ_ERR_LOG("bbq block init error"); + goto error; + } + } + + q->idx_bits = ceil_log2(bn); + q->off_bits = ceil_log2(bs) + 1; // 多线程同时add,可能超过bs的问题,因此多分配一位 + + q->idx_mask = (1 << q->idx_bits) - 1; + q->off_mask = (1 << q->off_bits) - 1; + + return q; + +error: + bbq_destory(q); + return NULL; +} + +struct bbq * bbq_create_bnbs(uint32_t bn, uint32_t bs, int socket_id, uint32_t flags) +{ + return __bbq_create_bnbs(bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR); +} + +struct bbq * bbq_create_bnbs_elem(uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags) +{ + return __bbq_create_bnbs(bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE); +} + +/* 创建消息队列,count必须大于1,且是2的N次幂,bn和bs将根据count值自动计算,socket_id用于多numa分配内存,free_func先设置NULL + */ +struct bbq * bbq_create_elem(uint32_t count, size_t obj_size, int socket_id, uint32_t flags) +{ + if (bbq_check_power_of_two(count) == false || count == 1) + { + BBQ_ERR_LOG("bbq entries number must be power of two and greater than 1, now is :%u", count); + return NULL; + } + + uint32_t bn = bbq_blocks_calc(count); + uint32_t bs = count / bn; + + return bbq_create_bnbs_elem(bn, bs, obj_size, socket_id, flags); +} + +struct bbq * bbq_create(uint32_t count, int socket_id, uint32_t flags) +{ + if (bbq_check_power_of_two(count) == false || count == 1) + { + BBQ_ERR_LOG("bbq entries number must be power of two and greater than 1, now is :%u", count); + return NULL; + } + + uint32_t bn = bbq_blocks_calc(count); + uint32_t bs = count / bn; + + return bbq_create_bnbs(bn, bs, socket_id, flags); +} + +/* 释放消息队列,与bbq_ring_create系列接口成对*/ +void bbq_destory(struct bbq * q) +{ + if (q == NULL) + { + return; + } + + for (uint32_t i = 0; i < q->bn; ++i) + { + block_destory(q, &(q->blocks[i])); + } + + bbq_free(BBQ_MODULE_QUEUE_BLOCK_NB, q->socket_id, q->blocks, q->bn * sizeof(*q->blocks)); + bbq_free(BBQ_MODULE_QUEUE, q->socket_id, q, sizeof(*q)); +} + +#define BBQ_DATA_TYPE_SINGLE 0x0 +#define BBQ_DATA_TYPE_ARRAY_1D 0x1 +#define BBQ_DATA_TYPE_ARRAY_2D 0x2 +void commit_entry(struct bbq * q, struct bbq_entry_desc * e, void const * data, uint32_t data_type) +{ + size_t idx = e->off * q->entry_size; + + if (BBQ_F_CHK_VALUE(q->flags)) + { + // 数据入队列 + switch (data_type) + { + case BBQ_DATA_TYPE_ARRAY_1D: + case BBQ_DATA_TYPE_SINGLE: + memcpy(&(e->block->entries[idx]), data, q->entry_size * e->actual_burst); + break; + case BBQ_DATA_TYPE_ARRAY_2D: { + void ** tmp = (void **)data; + char * entry = &(e->block->entries[idx]); + for (size_t i = 0; i < e->actual_burst; i++) + { + memcpy(entry, *tmp, q->entry_size); + entry += q->entry_size; + tmp++; + } + break; + } + default: + break; + } + } + else + { + // 指针入队列 + switch (data_type) + { + case BBQ_DATA_TYPE_ARRAY_2D: + case BBQ_DATA_TYPE_SINGLE: + // 二维数组名等于首成员的地址,这里data其实是void **data, &data等同于 &(data[0]) + memcpy(&(e->block->entries[idx]), data, q->entry_size * e->actual_burst); + break; + case BBQ_DATA_TYPE_ARRAY_1D: + break; + default: + break; + } + } + atomic_fetch_add(&e->block->committed, e->actual_burst); +} + +struct bbq_queue_state_s allocate_entry(struct bbq * q, struct bbq_block * block, uint32_t n) +{ + struct bbq_queue_state_s state = {0}; + if (bbq_off(q, atomic_load(&block->allocated)) >= q->bs) + { + state.state = BBQ_BLOCK_DONE; + return state; + } + + uint64_t old = atomic_fetch_add(&block->allocated, n); + uint64_t committed_vsn = bbq_cur_vsn(q, atomic_load(&block->committed)); + + // committed_vsn在当前块被初始化后值是不变的,通过比较vsn值,来判断allocated的off是否溢出了,导致vsn+1 + uint64_t cur_vsn = bbq_cur_vsn(q, old); + uint64_t cur_off = bbq_off(q, old); + if ((cur_vsn != committed_vsn) || (cur_off >= q->bs)) + { + state.state = BBQ_BLOCK_DONE; + return state; + } + + if (cur_off + n <= q->bs) + { + // 可以全部入队 + state.e.actual_burst = n; + } + else + { + // 部分入队 + state.e.actual_burst = q->bs - cur_off; + } + state.e.block = block; + state.e.vsn = cur_vsn; + state.e.off = cur_off; + state.state = BBQ_ALLOCATED; + + return state; +} + +enum bbq_queue_state advance_phead(struct bbq * q, uint64_t ph) +{ + // 获取下一个block + uint64_t cur = 0; + struct bbq_block * n_blk = &(q->blocks[(bbq_idx(q, ph) + 1) & q->idx_mask]); + uint64_t ph_vsn = bbq_head_vsn(q, ph); + + if (BBQ_F_CHK_DROP_OLD(q->flags)) + { + cur = atomic_load(&n_blk->committed); + // 生产者避免前进到上一轮中尚未完全提交的区块 + if (bbq_cur_vsn(q, cur) == ph_vsn && bbq_off(q, cur) != q->bs) + { + return BBQ_NOT_AVAILABLE; + } + } + else + { + cur = atomic_load(&n_blk->consumed); + uint64_t reserved; + uint64_t consumed_off = bbq_off(q, cur); + uint64_t consumed_vsn = bbq_cur_vsn(q, cur); + + if (consumed_vsn < ph_vsn || // 生产者赶上了消费者 + (consumed_vsn == ph_vsn && consumed_off != q->bs)) + { + reserved = atomic_load(&n_blk->reserved); + if (bbq_off(q, reserved) == consumed_off) + { + return BBQ_NO_ENTRY; + } + else + { + return BBQ_NOT_AVAILABLE; + } + } + } + + // 用head的version初始化下一个块,version在高位,version+1,idex/offset清零,如果没有被其他线程执行过,数值会高于旧值。多线程同时只更新一次。 + uint64_t new_vsn = set_cur_vsn(q, ph_vsn + 1); + fetch_max(&n_blk->committed, new_vsn); + fetch_max(&n_blk->allocated, new_vsn); + + // 索引+1,当超过索引范围,也就是循环下一轮块时,version+1 + fetch_max(&q->phead, ph + 1); + return BBQ_SUCCESS; +} + +static uint32_t bbq_free_space_set(struct bbq * q, int32_t ret_status, uint64_t ph, struct bbq_block * blk_ph) +{ + if (ret_status == BBQ_QUEUE_FULL) + { + return 0; + } + + uint64_t ch = atomic_load(&q->chead); + struct bbq_block * blk_ch = &(q->blocks[bbq_idx(q, ch)]); + + // uint64_t ph_vsn = bbq_head_vsn(q, ph); + // uint64_t ch_vsn = bbq_head_vsn(q, ch); + uint64_t ph_idx = bbq_idx(q, ph); + uint64_t ch_idx = bbq_idx(q, ch); + uint64_t committed_off = bbq_off(q, atomic_load(&blk_ph->committed)); + uint64_t reserved_off = bbq_off(q, atomic_load(&blk_ch->reserved)); + + if (BBQ_F_CHK_DROP_OLD(q->flags)) + { + // TODO + return 0; + } + else + { + // 生产者到消费者的距离 + uint64_t idx_diff = ph_idx >= ch_idx ? q->bn - ph_idx + ch_idx : ch_idx - ph_idx; + return (idx_diff - 1) * q->bs + (q->bs - committed_off + reserved_off); + } +} + +/* 消息队列入队 */ +static struct bbq_status __bbq_enqueue(struct bbq * q, void const * data, uint32_t n, uint32_t flag, + uint32_t * free_space) +{ + struct bbq_status ret = {.status = 0, .actual_burst = 0}; + + if (q == NULL || data == NULL) + { + ret.status = BBQ_NULL_PTR; + return ret; + } + + while (true) + { + // 获取当前phead,转为索引后获取到当前的blk + uint64_t ph = atomic_load(&q->phead); + struct bbq_block * blk = &(q->blocks[bbq_idx(q, ph)]); + struct bbq_queue_state_s state = allocate_entry(q, blk, n); + + switch (state.state) + { + case BBQ_ALLOCATED: + commit_entry(q, &state.e, data, flag); + ret.actual_burst = state.e.actual_burst; + ret.status = BBQ_OK; + break; + case BBQ_BLOCK_DONE: { + enum bbq_queue_state pstate = advance_phead(q, ph); + if (pstate == BBQ_SUCCESS) + { + continue; + } + + if (pstate == BBQ_NO_ENTRY) + { + ret.status = BBQ_QUEUE_FULL; + } + else if (pstate == BBQ_NOT_AVAILABLE) + { + ret.status = BBQ_QUEUE_BUSY; + } + else + { + ret.status = BBQ_ERROR; + } + + break; + } + default: + ret.status = BBQ_ERROR; + break; + } + + if (free_space != NULL) + { + *free_space = bbq_free_space_set(q, ret.status, ph, blk); + } + + return ret; + } +} + +int bbq_enqueue(struct bbq * q, void * const * data) +{ + struct bbq_status ret = __bbq_enqueue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL); + return ret.status; +} + +int bbq_enqueue_elem(struct bbq * q, void const * data) +{ + struct bbq_status ret = __bbq_enqueue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL); + return ret.status; +} + +/* 更新成功 reserve成功的个数 */ +uint32_t reserve_update(bbq_cursor * aotmic, uint64_t reserved, uint32_t n) +{ + // TODO:逻辑可以合并 + if (n == 1) + { + // fetch_max返回的是旧值,如果旧值等于局部变量reserved,则代表数据由当前线程更新 + if (fetch_max(aotmic, reserved + 1) == reserved) + { + return 1; + } + + return 0; + } + else + { + bool ret = atomic_compare_exchange_weak(aotmic, &reserved, reserved + n); + return ret == true ? n : 0; + } +} + +struct bbq_queue_state_s reserve_entry(struct bbq * q, struct bbq_block * block, uint32_t n) +{ + while (true) + { + struct bbq_queue_state_s state; + uint64_t reserved = atomic_load(&block->reserved); + uint64_t reserved_off = bbq_off(q, reserved); + uint64_t reserved_svn = bbq_cur_vsn(q, reserved); + + if (reserved_off < q->bs) + { + uint64_t consumed = atomic_load(&block->consumed); + // TODO:bug?? ver溢出了,在drop old模式下使用了vsn,应该传入consumed的vsn合理? + // TODO:这个情况可能出现? + if (!BBQ_F_CHK_DROP_OLD(q->flags) && reserved_svn != bbq_cur_vsn(q, consumed)) + { + // consumed溢出了,这种情况只发生在BBQ_RETRY_NEW,因为BBQ_DROP_OLD模式,consumed没有用到 + state.state = BBQ_BLOCK_DONE; + state.vsn = reserved_svn; + return state; + } + + uint64_t committed = atomic_load(&block->committed); + uint64_t committed_off = bbq_off(q, committed); + if (committed_off == reserved_off) + { + state.state = BBQ_NO_ENTRY; + return state; + } + + // 当前块的数据没有被全部commited,需要通过判断allocated和committed来判断是否存在正在入队进行中的数据 + if (committed_off != q->bs) + { + uint64_t allocated = atomic_load(&block->allocated); + if (bbq_off(q, allocated) != committed_off) + { + state.state = BBQ_NOT_AVAILABLE; + return state; + } + } + + uint32_t tmp = committed_off - reserved_off; + uint32_t reserved_cnt = reserve_update(&block->reserved, reserved, tmp < n ? tmp : n); + if (reserved_cnt > 0) + { // TODO:多entry时关注 + state.state = BBQ_RESERVED; + state.e.actual_burst = reserved_cnt; + state.e.block = block; + state.e.off = reserved_off; + state.e.vsn = reserved_svn; + + return state; + } + else + { + // 如果不等于代表block.reserved被其他线程Reserved了 + continue; + } + } + + state.state = BBQ_BLOCK_DONE; + state.vsn = reserved_svn; + return state; + } +} + +bool consume_entry(struct bbq * q, struct bbq_entry_desc * e, void * deq_data, uint32_t data_type) +{ + size_t idx = e->off * q->entry_size; + + if (BBQ_F_CHK_VALUE(q->flags)) + { + switch (data_type) + { + case BBQ_DATA_TYPE_ARRAY_1D: + case BBQ_DATA_TYPE_SINGLE: + memcpy(deq_data, &(e->block->entries[idx]), q->entry_size * e->actual_burst); + break; + case BBQ_DATA_TYPE_ARRAY_2D: { + void ** tmp = (void **)deq_data; + char * entry = &(e->block->entries[idx]); + for (size_t i = 0; i < e->actual_burst; i++) + { + memcpy(*tmp, entry, q->entry_size); + entry += q->entry_size; + tmp++; + } + break; + } + default: + break; + } + } + else + { + switch (data_type) + { + case BBQ_DATA_TYPE_ARRAY_2D: + case BBQ_DATA_TYPE_SINGLE: + memcpy(deq_data, &(e->block->entries[idx]), q->entry_size * e->actual_burst); + break; + case BBQ_DATA_TYPE_ARRAY_1D: + default: + break; + } + } + + uint64_t allocated; + if (BBQ_F_CHK_DROP_OLD(q->flags)) + { + // TODO:优化,考虑allocated vsn溢出?考虑判断如果生产满了,直接移动head? + allocated = atomic_load(&e->block->allocated); + // 预留的entry所在的块,已经被新生产的数据赶上了 + if (bbq_cur_vsn(q, allocated) != e->vsn) + { + return false; + } + } + else + { + atomic_fetch_add(&e->block->consumed, e->actual_burst); + } + + return true; +} + +bool advance_chead(struct bbq * q, uint64_t ch, uint64_t ver) +{ + uint64_t ch_idx = bbq_idx(q, ch); + struct bbq_block * n_blk = &(q->blocks[(ch_idx + 1) & q->idx_mask]); + + uint64_t ch_vsn = bbq_head_vsn(q, ch); + uint64_t committed = atomic_load(&n_blk->committed); + uint64_t committed_vsn = bbq_cur_vsn(q, committed); + if (BBQ_F_CHK_DROP_OLD(q->flags)) + { + // 通过检查下一个块的版本是否大于或等于当前块来保证 FIFO 顺序. + // 第一个块是一个特殊情况,因为与其他块相比,它的版本总是相差一个。因此,如果 ch_idx == 0,我们在比较中加 1 + if (committed_vsn < ver + (ch_idx == 0)) + return false; + fetch_max(&n_blk->reserved, set_cur_vsn(q, committed_vsn)); + } + else + { + if (committed_vsn != ch_vsn + 1) + { + // 消费者追上了生产者,下一块还未开始生产 + return false; + } + uint64_t new_vsn = set_cur_vsn(q, ch_vsn + 1); + fetch_max(&n_blk->consumed, new_vsn); + fetch_max(&n_blk->reserved, new_vsn); + } + + fetch_max(&q->chead, ch + 1); + return true; +} + +/* 消息队列出队 */ +static struct bbq_status __bbq_dequeue(struct bbq * q, void * deq_data, uint32_t n, uint32_t data_type) +{ + struct bbq_status ret = {.status = 0, .actual_burst = 0}; + if (q == NULL || deq_data == NULL) + { + ret.status = BBQ_NULL_PTR; + return ret; + } + + while (true) + { + uint64_t ch = atomic_load(&q->chead); + struct bbq_block * blk = &(q->blocks[bbq_idx(q, ch)]); + + struct bbq_queue_state_s state; + state = reserve_entry(q, blk, n); + + switch (state.state) + { + case BBQ_RESERVED: + if (consume_entry(q, &state.e, deq_data, data_type)) + { + ret.status = BBQ_OK; + ret.actual_burst = state.e.actual_burst; + return ret; + } + else + { + continue; + } + case BBQ_NO_ENTRY: + ret.status = BBQ_QUEUE_EMPTY; + return ret; + case BBQ_NOT_AVAILABLE: + ret.status = BBQ_QUEUE_BUSY; + return ret; + case BBQ_BLOCK_DONE: + if (advance_chead(q, ch, state.vsn)) + { + continue; + } + else + { + ret.status = BBQ_QUEUE_EMPTY; + return ret; + } + default: + ret.status = BBQ_ERROR; + return ret; + } + } +} + +int bbq_dequeue(struct bbq * q, void ** data) +{ + struct bbq_status ret = __bbq_dequeue(q, data, 1, BBQ_DATA_TYPE_SINGLE); + return ret.status; +} + +int bbq_dequeue_elem(struct bbq * q, void * data) +{ + struct bbq_status ret = __bbq_dequeue(q, data, 1, BBQ_DATA_TYPE_SINGLE); + return ret.status; +} + +uint32_t bbq_max_burst(struct bbq * q, uint32_t n) +{ + uint32_t burst = n; + if (burst > q->bs) + { + burst = q->bs; + } + + return burst; +} + +static uint32_t bbq_dequeue_burst_one_dimensional(struct bbq * q, void * obj_table, uint32_t n) +{ + if (q == NULL || obj_table == NULL) + { + return BBQ_NULL_PTR; + } + + if (!BBQ_F_CHK_VALUE(q->flags)) + { + return BBQ_QUEUE_DATA_ERR; + } + + uint32_t burst = 0; + uint32_t ready = 0; + void * obj = obj_table; + struct bbq_status ret = {0}; + + while (ready < n) + { + burst = bbq_max_burst(q, n - ready); + ret = __bbq_dequeue(q, obj, burst, BBQ_DATA_TYPE_ARRAY_1D); + if (ret.status != BBQ_OK) + { + break; + } + obj += q->entry_size * ret.actual_burst; + ready += ret.actual_burst; + } + + return ready; +} + +static uint32_t bbq_dequeue_burst_two_dimensional(struct bbq * q, void ** obj_table, uint32_t n) +{ + if (q == NULL || obj_table == NULL) + { + return BBQ_NULL_PTR; + } + + uint32_t burst = 0; + uint32_t ready = 0; + void ** obj_table_tmp = obj_table; + struct bbq_status ret = {0}; + + while (ready < n) + { + burst = bbq_max_burst(q, n - ready); + ret = __bbq_dequeue(q, obj_table_tmp, burst, BBQ_DATA_TYPE_ARRAY_2D); + if (ret.status != BBQ_OK) + { + break; + } + obj_table_tmp += ret.actual_burst; + ready += ret.actual_burst; + } + + return ready; +} + +/* 尝试一次入队多个数据,直到达到最大数量,或是入队失败 */ +uint32_t bbq_enqueue_burst_one_dimensional(struct bbq * q, void const * obj_table, uint32_t n) +{ + if (q == NULL || obj_table == NULL) + { + return BBQ_NULL_PTR; + } + + if (!BBQ_F_CHK_VALUE(q->flags)) + { + return BBQ_QUEUE_DATA_ERR; + } + + uint32_t burst = 0; + uint32_t ready = 0; + void const * obj = obj_table; + struct bbq_status ret = {0}; + + while (ready < n) + { + burst = bbq_max_burst(q, n - ready); + ret = __bbq_enqueue(q, obj, burst, BBQ_DATA_TYPE_ARRAY_1D, NULL); + if (ret.status != BBQ_OK) + { + break; + } + obj += q->entry_size * ret.actual_burst; + ready += ret.actual_burst; + } + + return ready; +} + +/* 尝试一次入队多个数据,直到达到最大数量,或是入队失败 */ +uint32_t bbq_enqueue_burst_two_dimensional(struct bbq * q, void * const * obj_table, uint32_t n, uint32_t * free_space) +{ + if (q == NULL || obj_table == NULL) + { + return BBQ_NULL_PTR; + } + + uint32_t burst = 0; + uint32_t ready = 0; + void * const * obj_table_tmp = obj_table; + struct bbq_status ret = {0}; + + while (ready < n) + { + burst = bbq_max_burst(q, n - ready); + ret = __bbq_enqueue(q, obj_table_tmp, burst, BBQ_DATA_TYPE_ARRAY_2D, free_space); + if (ret.status != BBQ_OK) + { + break; + } + obj_table_tmp += ret.actual_burst; + ready += ret.actual_burst; + } + + return ready; +} + +bool bbq_empty(struct bbq * q) +{ + uint64_t phead = atomic_load(&q->phead); + uint64_t chead = atomic_load(&q->chead); + + uint64_t ph_vsn = bbq_head_vsn(q, phead); + uint64_t ch_vsn = bbq_head_vsn(q, chead); + uint64_t ph_idx = bbq_idx(q, phead); + uint64_t ch_idx = bbq_idx(q, chead); + + struct bbq_block * block; + + if (ph_idx != ch_idx) + { + return false; + } + + block = &q->blocks[ph_idx]; + if (ph_vsn == ch_vsn) + { + if (bbq_off(q, atomic_load(&block->reserved)) == bbq_off(q, atomic_load(&block->committed))) + { + return true; + } + } + + bbq_cursor reserved = atomic_load(&block->reserved); + uint64_t reserved_off = bbq_off(q, reserved); + + if (BBQ_F_CHK_DROP_OLD(q->flags) && ph_vsn > ch_vsn && reserved_off != q->bs) + { + // 生产者追上了消费者,当前块以及未消费的全部 + // 如果reserved指向当前块的最后一个entry,可以移动head消费下一块,否则返回空 + return true; + } + + return false; +} + +uint32_t bbq_enqueue_burst_elem(struct bbq * q, void const * obj_table, uint32_t n) +{ + return bbq_enqueue_burst_one_dimensional(q, obj_table, n); +} + +uint32_t bbq_enqueue_burst_elem_two_dimensional(struct bbq * q, void * const * obj_table, uint32_t n) +{ + return bbq_enqueue_burst_two_dimensional(q, obj_table, n, NULL); +} + +uint32_t bbq_enqueue_burst(struct bbq * q, void * const * obj_table, uint32_t n, uint32_t * free_space) +{ + return bbq_enqueue_burst_two_dimensional(q, obj_table, n, free_space); +} + +uint32_t bbq_dequeue_burst(struct bbq * q, void ** obj_table, uint32_t n) +{ + return bbq_dequeue_burst_two_dimensional(q, obj_table, n); +} + +uint32_t bbq_dequeue_burst_elem(struct bbq * q, void * obj_table, uint32_t n) +{ + return bbq_dequeue_burst_one_dimensional(q, obj_table, n); +} + +bool bbq_malloc_free_equal() +{ +#ifdef BBQ_MEMORY + bool ret = true; + for (int i = 0; i < BBQ_MODULE_MAX; i++) + { + uint64_t malloc_cnt = atomic_load(&bbq_memory_g[i].malloc_cnt); + uint64_t free_cnt = atomic_load(&bbq_memory_g[i].free_cnt); + if (malloc_cnt != free_cnt) + { + BBQ_ERR_LOG("[module:%d] malloc:%lu free:%lu, bbq mmalloc-free count not equal\n", i, malloc_cnt, free_cnt); + ret = false; + } + + uint64_t malloc_size = atomic_load(&bbq_memory_g[i].malloc_size); + uint64_t free_size = atomic_load(&bbq_memory_g[i].free_size); + if (malloc_size != free_size) + { + BBQ_ERR_LOG("[module:%d] malloc:%lu byte free:%lu byte, bbq mmalloc-free size not equal\n", i, malloc_size, + free_size); + ret = false; + } + } + return ret; +#else + return true; +#endif +} + +bool bbq_debug_check_array_bounds(struct bbq * q) +{ +#ifdef BBQ_MEMORY + void * value = malloc(q->entry_size); + memset(value, BBQ_MEM_MAGIC, q->entry_size); + + for (size_t i = 0; i < q->bn; i++) + { + // 针对内存检查版本,申请了bs+1个entry + char * last_entry = q->blocks[i].entries + q->bs * q->entry_size; + if (memcmp(last_entry, value, q->entry_size) != 0) + { + return false; + } + } +#else + AVOID_WARNING(q); +#endif + return true; +} + +void bbq_debug_memory_print() +{ +#ifdef BBQ_MEMORY + for (int i = 0; i < BBQ_MODULE_MAX; i++) + { + uint64_t malloc_cnt = atomic_load(&bbq_memory_g[i].malloc_cnt); + uint64_t free_cnt = atomic_load(&bbq_memory_g[i].free_cnt); + if (malloc_cnt == 0 && free_cnt == 0) + { + continue; + } + + printf("[%d]bbq malloc:%lu free:%lu\n", i, atomic_load(&bbq_memory_g[i].malloc_cnt), + atomic_load(&bbq_memory_g[i].free_cnt)); + } + + if (bbq_malloc_free_equal()) + { + printf("all memory free\n"); + } + else + { + BBQ_ERR_LOG("memory not all free"); + } +#endif +} + +void bbq_debug_block_print(struct bbq * q, struct bbq_block * block) +{ + bbq_cursor allocated = atomic_load(&block->allocated); + bbq_cursor committed = atomic_load(&block->committed); + bbq_cursor reserved = atomic_load(&block->reserved); + bbq_cursor consumed = atomic_load(&block->consumed); + printf(" allocated:%lu committed:%lu reserved:%lu", bbq_off(q, allocated), bbq_off(q, committed), + bbq_off(q, reserved)); + if (BBQ_F_CHK_DROP_OLD(q->flags)) + { + printf("\n"); + } + else + { + printf(" consumed:%lu\n", bbq_off(q, consumed)); + } +} + +void bbq_debug_struct_print(struct bbq * q) +{ + printf("-----bbq:%s-----\n", BBQ_F_CHK_DROP_OLD(q->flags) ? "drop old" : "retry new"); + uint64_t phead = atomic_load(&q->phead); + uint64_t chead = atomic_load(&q->chead); + + printf("block number:%lu block size:%lu total entries:%lu\n", q->bn, q->bs, q->bn * q->bs); + printf("producer header idx:%lu vsn:%lu\n", bbq_idx(q, phead), bbq_head_vsn(q, phead)); + + uint64_t ph_idx = bbq_idx(q, phead); + uint64_t ch_idx = bbq_idx(q, chead); + if (ph_idx != ch_idx) + { + printf("block[%lu]\n", ph_idx); + bbq_debug_block_print(q, &(q->blocks[ph_idx])); + } + + printf("consumer header idx:%lu vsn:%lu\n", bbq_idx(q, chead), bbq_head_vsn(q, chead)); + printf("block[%lu]\n", ch_idx); + bbq_debug_block_print(q, &(q->blocks[ch_idx])); +} \ No newline at end of file diff --git a/infra/src/vnode_common.c b/infra/src/vnode_common.c index 0dd413a..489b708 100644 --- a/infra/src/vnode_common.c +++ b/infra/src/vnode_common.c @@ -25,6 +25,7 @@ #include #include +#include "bbq.h" #include "dp_trace.h" #include "vnode_common.h" #include @@ -40,15 +41,18 @@ * +-------------+-------------+-------------+------------+------------+ * Len = sizeof(nr_prodq) + sizeof(nr_consq) + sizeof(descs) * nr_prodq * nr_consq */ - static struct tunnel_desc * tunnel_new(const char * symbol, unsigned int sz_exclusive, unsigned int sz_shared, unsigned int sz_buffer) { struct tunnel_desc * desc = ZMALLOC(sizeof(struct tunnel_desc)); MR_VERIFY_MALLOC(desc); - desc->tunnel_object = rte_ring_create(symbol, sz_exclusive + sz_shared, SOCKET_ID_ANY, - RING_F_SC_DEQ | RING_F_SP_ENQ); +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + desc->tunnel_object = bbq_create(sz_exclusive + sz_shared, BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); +#else + desc->tunnel_object = + rte_ring_create(symbol, sz_exclusive + sz_shared, SOCKET_ID_ANY, RING_F_SC_DEQ | RING_F_SP_ENQ); +#endif if (desc->tunnel_object == NULL) { @@ -77,7 +81,11 @@ static struct tunnel_desc * tunnel_new(const char * symbol, unsigned int sz_excl errout: if (desc->tunnel_object != NULL) { +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + bbq_destory(desc->tunnel_object); +#else rte_ring_free(desc->tunnel_object); +#endif desc->tunnel_object = NULL; } @@ -104,7 +112,11 @@ static int tunnel_delete(struct tunnel_desc * desc) } struct rte_mbuf * mbuf; +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + while (bbq_dequeue(desc->tunnel_object, (void **)&mbuf) == 0) +#else while (rte_ring_dequeue(desc->tunnel_object, (void **)&mbuf) == 0) +#endif { infra_rte_pktmbuf_free(mbuf); } @@ -116,10 +128,19 @@ static int tunnel_delete(struct tunnel_desc * desc) infra_rte_pktmbuf_free(desc->rt_buffer[i]); } +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + MR_VERIFY_2(bbq_empty(desc->tunnel_object) == 1, "Tunnel %s is not empty", desc->symbol); +#else MR_VERIFY_2(rte_ring_empty(desc->tunnel_object) == 1, "Tunnel %s is not empty", desc->symbol); +#endif rte_free(desc->en_buffer); rte_free(desc->rt_buffer); + +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + bbq_destory(desc->tunnel_object); +#else rte_ring_free(desc->tunnel_object); +#endif rte_free(desc); return 0; } diff --git a/infra/src/vnode_common.h b/infra/src/vnode_common.h index 4dc35b9..ae630a0 100644 --- a/infra/src/vnode_common.h +++ b/infra/src/vnode_common.h @@ -26,8 +26,12 @@ struct tunnel_desc /* Tunnel Name */ char symbol[MR_SYMBOL_MAX]; +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + struct bbq * tunnel_object; +#else /* Tunel Object, real object to hold pointers */ struct rte_ring * tunnel_object; +#endif /* Tunnel Size */ unsigned int tunnel_size; diff --git a/infra/src/vnode_mirror.c b/infra/src/vnode_mirror.c index 82760be..78afc90 100644 --- a/infra/src/vnode_mirror.c +++ b/infra/src/vnode_mirror.c @@ -4,6 +4,7 @@ #include #include +#include "bbq.h" #include "dp_trace.h" #include "vnode_common.h" @@ -21,7 +22,8 @@ static inline unsigned int dist_tunnel_rt_objects_retrieve(struct tunnel_desc * return nr_rt_objs; } -static inline bool dist_tunnel_acquire_credits(struct vnode * vnode_desc, struct tunnel_desc * tunnel_desc, int32_t credits) +static inline bool dist_tunnel_acquire_credits(struct vnode * vnode_desc, struct tunnel_desc * tunnel_desc, + int32_t credits) { int32_t inflight_credits = tunnel_desc->inflight_credits; int32_t missing_credits = credits - inflight_credits; @@ -50,8 +52,8 @@ static inline bool dist_tunnel_acquire_credits(struct vnode * vnode_desc, struct } /* This is a race, no locks are involved, and thus some other - * thread can allocate tokens in between the check and the - * allocation. + * thread can allocate tokens in between the check and the + * allocation. */ new_total_on_loan = __atomic_fetch_add(&vnode_desc->credits_on_loan, acquired_credits, __ATOMIC_RELAXED) + acquired_credits; @@ -67,7 +69,8 @@ static inline bool dist_tunnel_acquire_credits(struct vnode * vnode_desc, struct return true; } -static inline void dist_tunnel_return_credits(struct vnode * vnode_desc, struct tunnel_desc * tunnel_desc, int32_t credits) +static inline void dist_tunnel_return_credits(struct vnode * vnode_desc, struct tunnel_desc * tunnel_desc, + int32_t credits) { tunnel_desc->inflight_credits += credits; @@ -122,7 +125,11 @@ static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons /* acquire credit, if failed, drop all the packets */ if (likely(is_acquire_credit_success)) { +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + n_send = bbq_enqueue_burst(desc->tunnel_object, (void **)desc->en_buffer, n_to_send, &n_free_space); +#else n_send = rte_ring_sp_enqueue_burst(desc->tunnel_object, (void **)desc->en_buffer, n_to_send, &n_free_space); +#endif } else { @@ -213,9 +220,14 @@ out: #endif } -static inline int dist_tunnel_dequeue(struct vnode * vnode_desc, struct tunnel_desc * tunnel_desc, void * obj, unsigned int nr_max_obj) +static inline int dist_tunnel_dequeue(struct vnode * vnode_desc, struct tunnel_desc * tunnel_desc, void * obj, + unsigned int nr_max_obj) { +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + unsigned int nr_deq = bbq_dequeue_burst(tunnel_desc->tunnel_object, obj, nr_max_obj); +#else unsigned int nr_deq = rte_ring_sc_dequeue_burst(tunnel_desc->tunnel_object, obj, nr_max_obj, NULL); +#endif dist_tunnel_return_credits(vnode_desc, tunnel_desc, (int32_t)nr_deq); return (int)nr_deq; } @@ -233,8 +245,8 @@ static inline void dist_tunnel_block_flush(struct tunnel_block * block, int prod } } -static inline void dist_tunnel_block_enqueue_with_hash(struct tunnel_block * block, unsigned int prodq, struct rte_mbuf * obj[], - uint32_t hash[], unsigned int nr_obj) +static inline void dist_tunnel_block_enqueue_with_hash(struct tunnel_block * block, unsigned int prodq, + struct rte_mbuf * obj[], uint32_t hash[], unsigned int nr_obj) { assert(nr_obj <= MR_LIBVNODE_MAX_SZ_BURST); for (unsigned int i = 0; i < nr_obj; i++) diff --git a/infra/test/TestVNode.cc b/infra/test/TestVNode.cc index 0cc8756..7101e71 100644 --- a/infra/test/TestVNode.cc +++ b/infra/test/TestVNode.cc @@ -3,9 +3,9 @@ extern "C" { -#include -#include #include +#include +#include #include } @@ -242,8 +242,11 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue) struct rte_mbuf * rt_objects[TEST_MBUFS_COUNT]; int rt_ret = vnode_mirror_rt_object_retrieve(prod, 0, rt_objects, RTE_DIM(rt_objects)); +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + EXPECT_EQ(rt_ret, 480); +#else EXPECT_EQ(rt_ret, 481); - +#endif rte_pktmbuf_free_bulk(enq_objs, rt_ret); }); @@ -253,7 +256,11 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue) struct rte_mbuf * rt_objects[TEST_MBUFS_COUNT]; int rt_ret = vnode_mirror_rt_object_retrieve(prod, 1, rt_objects, RTE_DIM(rt_objects)); +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + EXPECT_EQ(rt_ret, 480); +#else EXPECT_EQ(rt_ret, 481); +#endif rte_pktmbuf_free_bulk(enq_objs, rt_ret); }); @@ -264,7 +271,11 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue) struct rte_mbuf * rt_objects[TEST_MBUFS_COUNT]; int rt_ret = vnode_mirror_rt_object_retrieve(prod, 2, rt_objects, RTE_DIM(rt_objects)); +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + EXPECT_EQ(rt_ret, 480); +#else EXPECT_EQ(rt_ret, 481); +#endif rte_pktmbuf_free_bulk(enq_objs, rt_ret); }); @@ -275,7 +286,11 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue) struct rte_mbuf * rt_objects[TEST_MBUFS_COUNT]; int rt_ret = vnode_mirror_rt_object_retrieve(prod, 3, rt_objects, RTE_DIM(rt_objects)); +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + EXPECT_EQ(rt_ret, 480); +#else EXPECT_EQ(rt_ret, 481); +#endif rte_pktmbuf_free_bulk(enq_objs, rt_ret); }); @@ -299,18 +314,33 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue) } EXPECT_EQ(prod_on_line_total, 2048); + +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + EXPECT_EQ(prod_deliver_total, 128); + EXPECT_EQ(prod_missed_total, 1920); +#else EXPECT_EQ(prod_deliver_total, 124); EXPECT_EQ(prod_missed_total, 1924); +#endif /* on cons side */ struct vnode_cons_stat * cons_stat = vnode_mirror_cons_stat_get(cons); EXPECT_EQ(cons_stat[0].on_line, 2048); +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + EXPECT_EQ(cons_stat[0].deliver, 128); + EXPECT_EQ(cons_stat[0].missed, 1920); +#else EXPECT_EQ(cons_stat[0].deliver, 124); EXPECT_EQ(cons_stat[0].missed, 1924); +#endif struct rte_mbuf * deq_objs[TEST_MBUFS_COUNT]; int deq_ret = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs)); +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + EXPECT_EQ(deq_ret, 128); +#else EXPECT_EQ(deq_ret, 124); +#endif rte_pktmbuf_free_bulk(deq_objs, deq_ret); vnode_mirror_delete(vnode_ptr); @@ -349,7 +379,11 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict) EXPECT_EQ(enq_ret, 0); int rt_ret = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, 512); +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + EXPECT_EQ(rt_ret, 480); +#else EXPECT_EQ(rt_ret, 481); +#endif rte_pktmbuf_free_bulk(deq_objs, rt_ret); /* at here, the ring is full, no object can be enqueue */ @@ -362,17 +396,32 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict) /* until here, we have 31 objects enqueue, so only 544 mbufs can be enqueue. */ int deq_ret_1 = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs)); + +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + EXPECT_EQ(deq_ret_1, 32); +#else EXPECT_EQ(deq_ret_1, 31); +#endif struct vnode_prod_stat * prod_stat = vnode_mirror_prod_stat_get(prod); EXPECT_EQ(prod_stat->on_line, 1024); +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + EXPECT_EQ(prod_stat->deliver, 32); + EXPECT_EQ(prod_stat->missed, 992); +#else EXPECT_EQ(prod_stat->deliver, 31); EXPECT_EQ(prod_stat->missed, 993); +#endif struct vnode_cons_stat * cons_stat = vnode_mirror_cons_stat_get(cons); EXPECT_EQ(cons_stat->on_line, 1024); +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + EXPECT_EQ(cons_stat->deliver, 32); + EXPECT_EQ(cons_stat->missed, 992); +#else EXPECT_EQ(cons_stat->deliver, 31); EXPECT_EQ(cons_stat->missed, 993); +#endif /* free these mbufs */ rte_pktmbuf_free_bulk(deq_objs, deq_ret_1); @@ -385,7 +434,11 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict) /* get the packet needs to be free */ int rt_ret_3 = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, 512); +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + EXPECT_EQ(rt_ret_3, 480); +#else EXPECT_EQ(rt_ret_3, 481); +#endif rte_pktmbuf_free_bulk(deq_objs, rt_ret_3); int enq_ret_4 = vnode_mirror_enqueue_bulk(prod, 0, enq_objs + 1536, enq_hashs, 512); @@ -396,19 +449,32 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict) rte_pktmbuf_free_bulk(deq_objs, rt_ret_4); int deq_ret_2 = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs)); +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + EXPECT_EQ(deq_ret_2, 32); +#else EXPECT_EQ(deq_ret_2, 31); - +#endif /* another round, the stat should be double */ prod_stat = vnode_mirror_prod_stat_get(prod); cons_stat = vnode_mirror_cons_stat_get(cons); EXPECT_EQ(prod_stat->on_line, 2048); +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + EXPECT_EQ(prod_stat->deliver, 64); + EXPECT_EQ(prod_stat->missed, 1984); +#else EXPECT_EQ(prod_stat->deliver, 62); EXPECT_EQ(prod_stat->missed, 1986); +#endif EXPECT_EQ(cons_stat->on_line, 2048); +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + EXPECT_EQ(cons_stat->deliver, 64); + EXPECT_EQ(cons_stat->missed, 1984); +#else EXPECT_EQ(cons_stat->deliver, 62); EXPECT_EQ(cons_stat->missed, 1986); +#endif rte_pktmbuf_free_bulk(deq_objs, deq_ret_2); vnode_mirror_delete(vnode_ptr); @@ -467,7 +533,11 @@ TEST_F(TestCaseVNode, TestVNodeMultipleQueueUseSharedCredict) /* retrieve the drop packets */ struct rte_mbuf * rt_objs[128] = {}; int rt_ret = vnode_mirror_rt_object_retrieve(prod, 0, rt_objs, 128); +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + EXPECT_EQ(rt_ret, 32); +#else EXPECT_EQ(rt_ret, 33); +#endif rte_pktmbuf_free_bulk(rt_objs, rt_ret); /* third 32 mbufs, another queue, 31 packet should be enqueue. */ @@ -480,13 +550,21 @@ TEST_F(TestCaseVNode, TestVNodeMultipleQueueUseSharedCredict) /* retrieve the drop packets */ rt_ret = vnode_mirror_rt_object_retrieve(prod, 1, rt_objs, 128); +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + EXPECT_EQ(rt_ret, 32); +#else EXPECT_EQ(rt_ret, 33); +#endif rte_pktmbuf_free_bulk(rt_objs, rt_ret); /* dequeue */ struct rte_mbuf * deq_objs[128] = {}; int deq_ret = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs)); +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + EXPECT_EQ(deq_ret, 64); +#else EXPECT_EQ(deq_ret, 62); +#endif rte_pktmbuf_free_bulk(deq_objs, deq_ret); vnode_mirror_delete(vnode_ptr); @@ -529,8 +607,12 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueMultipleQueue) EXPECT_EQ(enq_ret, 0); int rt_ret = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, 512); +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + EXPECT_EQ(rt_ret, 0); +#else EXPECT_EQ(rt_ret, 1); rte_pktmbuf_free_bulk(deq_objs, rt_ret); +#endif /* second 512 mbufs should be rejected */ int enq_ret_2 = vnode_mirror_enqueue_bulk(prod, 0, enq_objs + 512, enq_hashs, 512); @@ -558,8 +640,11 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueMultipleQueue) rte_pktmbuf_free_bulk(deq_objs, 512); int deq_ret = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs)); +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + EXPECT_EQ(deq_ret, 512); +#else EXPECT_EQ(deq_ret, 511); - +#endif rte_pktmbuf_free_bulk(deq_objs, deq_ret); vnode_mirror_delete(vnode_ptr); } @@ -959,7 +1044,7 @@ TEST_F(TestCaseVNodeQueue, MultiQueueEnqueue) EXPECT_EQ(enq_ret, 0); } - for(unsigned int i = 0; i < 8; i++) + for (unsigned int i = 0; i < 8; i++) { vnode_mirror_flush(prod, i); } @@ -969,8 +1054,15 @@ TEST_F(TestCaseVNodeQueue, MultiQueueEnqueue) EXPECT_EQ(cons_stat[0].on_line, 32); EXPECT_EQ(cons_stat[0].deliver, 32); EXPECT_EQ(cons_stat[0].missed, 0); + +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + // 32/8=4 + EXPECT_EQ(cons_stat[0].q_len_max, 4); + EXPECT_LE(cons_stat[0].q_len_avg_max, 4); +#else EXPECT_EQ(cons_stat[0].q_len_max, 5); EXPECT_LE(cons_stat[0].q_len_avg_max, 5); +#endif int deq_ret = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs)); EXPECT_EQ(deq_ret, 32); @@ -984,8 +1076,13 @@ TEST_F(TestCaseVNodeQueue, MultiQueueEnqueue) EXPECT_EQ(cons_stat[0].on_line, 32); EXPECT_EQ(cons_stat[0].deliver, 32); EXPECT_EQ(cons_stat[0].missed, 0); +#if defined(BBQ_SPSC) || defined(BBQ_MPSC) + EXPECT_EQ(cons_stat[0].q_len_max, 4); + EXPECT_LE(cons_stat[0].q_len_avg_max, 4); // TODO:check??? +#else EXPECT_EQ(cons_stat[0].q_len_max, 5); EXPECT_LE(cons_stat[0].q_len_avg_max, 32); +#endif rte_pktmbuf_free_bulk(deq_objs, deq_ret); vnode_mirror_delete(vnode_ptr); -- cgit v1.2.3 From c6735742b1990c65fc67fd4e2873fa1640dc4f7d Mon Sep 17 00:00:00 2001 From: liuyu Date: Wed, 26 Jun 2024 11:37:09 +0800 Subject: bugfix:更新bbq库实现,添加error number,修复burst失败时返回非0bug。 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- infra/include/bbq.h | 177 ++++++++++++------- infra/include/vnode.h | 2 +- infra/src/bbq.c | 437 +++++++++++++++++++++++++++-------------------- infra/src/vnode_common.c | 12 +- infra/src/vnode_common.h | 2 +- infra/src/vnode_mirror.c | 16 +- infra/test/TestVNode.cc | 44 ++--- 7 files changed, 410 insertions(+), 280 deletions(-) diff --git a/infra/include/bbq.h b/infra/include/bbq.h index 6e4d84e..d8afb3d 100644 --- a/infra/include/bbq.h +++ b/infra/include/bbq.h @@ -1,6 +1,6 @@ /* * @Author: liuyu@geedgenetworks.com - * @LastEditTime: 2024-06-20 18:03:59 + * @LastEditTime: 2024-06-25 16:49:38 * @Describe: bbq(Block-based Bounded Queue)头文件 * 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf */ @@ -26,6 +26,7 @@ using aotmic_uint64 = std::atomic; #define BBQ_SOCKET_ID_ANY -1 #define __BBQ_CACHE_ALIGNED __attribute__((__aligned__(64))) +#define BBQ_SYMBOL_MAX 64 struct bbq_block { @@ -33,43 +34,40 @@ struct bbq_block bbq_cursor allocated; // 已分配(version|offset) bbq_cursor reserved; // 已预留(version|offset) bbq_cursor consumed; // 已消费(version|offset)注:在drop-old模式下没用到 - char * entries; // 存储大小可变的entry,分配空间大小:bs * entry_size + char * entries; // 存储大小可变的entry,每个块分配空间:bs * entry_size } __BBQ_CACHE_ALIGNED; struct bbq { - size_t bn; // blocks的个数 - size_t bs; // blocks.entries的个数 - size_t entry_size; // blocks.entries里每个entry的大小 + char name[BBQ_SYMBOL_MAX] __BBQ_CACHE_ALIGNED; - int32_t socket_id; // 用于libnuma分配内存,socket_id小于0将使用malloc分配 - uint32_t flags; // 标记:retry new 模式,还是drop old模式 - uint32_t idx_bits; // bbq_head里idx所占的位数 - uint32_t off_bits; // bbq_cursor里offset所占的位数 - - uint64_t idx_mask; // idx_bits偏移后的掩码 - uint64_t off_mask; // off_bits偏移后的掩码 - bbq_head phead; // 生产者头,指向块的索引,分为两部分:version|idx - bbq_head chead; // 消费者头,指向块的索引,分为两部分:version|idx + int32_t socket_id; // 用于libnuma分配内存,socket_id小于0将使用malloc分配 + uint32_t bn; // blocks的个数 + uint32_t bs; // blocks.entries的个数 + uint32_t flags; // 标记:retry new 模式,还是drop old模式 + uint32_t idx_bits; // bbq_head里idx所占的位数 + uint32_t off_bits; // bbq_cursor里offset所占的位数 + uint64_t idx_mask; // idx_bits偏移后的掩码 + uint64_t off_mask; // off_bits偏移后的掩码 + uint64_t entry_size; // blocks.entries里每个entry的大小 + bbq_head phead; // 生产者头,指向块的索引,分为两部分:version|idx + bbq_head chead; // 消费者头,指向块的索引,分为两部分:version|idx struct bbq_block * blocks; // bn大小的数组 -}; - -#define BBQ_F_DROP_OLD 0x0002 /**< 创建队列时设置为drop old模式(队列满时,入队成功并覆盖旧数据) */ -// #define BBQ_F_SP_ENQ 0x0004 /**< 创建队列时设置为单生产者 */ -// #define BBQ_F_SC_DEQ 0x0008 /**< 创建队列时设置为单消费者 */ +} __BBQ_CACHE_ALIGNED; #define BBQ_F_DEFAULT 0x0 +#define BBQ_F_DROP_OLD 0x0002 /**< 创建队列时设置为drop old模式(队列满时,入队成功并覆盖旧数据) */ #define BBQ_F_RETRY_NEW BBQ_F_DEFAULT /**< 创建队列时设置为retry new模式(队列满时,当前入队失败) */ -#define BBQ_F_MP_ENQ BBQ_F_DEFAULT /**< 创建队列时设置为多生产者 */ -#define BBQ_F_MC_DEQ BBQ_F_DEFAULT /**< 创建队列时设置为多消费者 */ /** * 创建bbq队列,使用当前函数创建的队列,后续操作会把指针入队。 * 对应入队函数:bbq_enqueue、bbq_enqueue_burst * 对应出队函数:bbq_dequeue、bbq_dequeue_burst * + * @param[in] name + * 队列名称 * @param[in] count - * 队列所有entry的个数,count必须大于1,且是2的N次方。 + * 队列大小,参数必须大于1,且是2的N次方。 * @param[in] socket_id * 多numa架构下,调用libnuma库函数针对指定socket分配内存。 * 当检测到不支持多numa,将转为malloc分配内存。 @@ -79,9 +77,13 @@ struct bbq * 2)BBQ_F_DROP_OLD:队列满时,覆盖旧数据,入队成功 * @return * 非NULL:消息队列结构体指针,用于后续出队入队等操作。 - * NULL:创建失败。 + * NULL:创建失败,可通过bbq_errno分析具体错误原因: + * - BBQ_ERR_OUT_OF_RANGE:name或count参数超出范围 + * - BBQ_ERR_ALLOC:申请内存失败 + * - BBQ_ERR_POWER_OF_TWO:count不为2的n次方 + * - BBQ_ERR_INPUT_NULL:name传入空指针 */ -extern struct bbq * bbq_create(uint32_t count, int socket_id, uint32_t flags); +extern struct bbq * bbq_create(const char * name, uint32_t count, int socket_id, uint32_t flags); /** * 消息队列单个指针入队 @@ -89,10 +91,14 @@ extern struct bbq * bbq_create(uint32_t count, int socket_id, uint32_t flags); * @param[in] q * 队列指针 * @param[in] data - * 则传入一维指针,如: - * int *data = malloc(sizeof(int));*data = 1; 传入&data + * 指向入队指针的指针,如: + * int *data = malloc(sizeof(int));*data = TEST_DATA; 传入&data * @return - * 成功返回0,失败返回小于0的错误码。 + * 成功返回0,失败返回小于0的错误码。可能存在以下错误码: + * - BBQ_ERR_INPUT_NULL:传入空指针 + * - BBQ_ERR_FULL:队列已满 + * - BBQ_ERR_BUSY:队列忙碌中 + * - BBQ_ERR:其它错误 */ extern int bbq_enqueue(struct bbq * q, void * const * data); @@ -102,15 +108,19 @@ extern int bbq_enqueue(struct bbq * q, void * const * data); * @param[in] q * 队列指针 * @param[out] data - * 则传入二维指针,如: + * 传入二级指针,如: * int *data = NULL; 传入&data * @return - * 成功返回0,失败返回小于0的错误码。 + * 成功返回0,失败返回小于0的错误码: + * - BBQ_ERR_INPUT_NULL:传入空指针 + * - BBQ_ERR_EMPTY:队列已空 + * - BBQ_ERR_BUSY:队列忙碌中 + * - BBQ_ERR:其它错误 */ extern int bbq_dequeue(struct bbq * q, void ** data); /** - * 消息队列批量指针入队,尽可能一次入队n个指针,返回实际成功入队个数 + * 消息队列批量入队(指针入队),尽可能一次入队n个指针,返回实际成功入队个数 * * @param[in] q * 队列指针 @@ -119,41 +129,52 @@ extern int bbq_dequeue(struct bbq * q, void ** data); * uint16_t **obj_table = malloc(sizeof(uint16_t **) * BUF_CNT); * for(int i=0;i #include -// #define BBQ_SPSC +#define BBQ_SPSC #define __DECLARE_COMMON_VNODE_CREATE_PROD(_type) \ struct vnode_prod * vnode_##_type##_create_prod(struct vnode * vnode, const char * symbol, int nr_prodq); diff --git a/infra/src/bbq.c b/infra/src/bbq.c index 992b8f7..6d1a954 100644 --- a/infra/src/bbq.c +++ b/infra/src/bbq.c @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-20 18:58:05 + * @LastEditTime: 2024-06-25 16:49:28 * @Email: liuyu@geedgenetworks.com * @Describe: bbq(Block-based Bounded Queue)实现 * 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf @@ -12,8 +12,8 @@ #include // flags第1位控制入队时的数据拷贝策略,默认是"拷贝指针" -#define BBQ_F_COPY_PTR 0x0 /**< 默认为拷贝指针 */ -#define BBQ_F_COPY_VALUE 0x0001 /**< 创建队列时设置为拷贝数值 */ +#define BBQ_F_COPY_PTR BBQ_F_DEFAULT /**< 默认为拷贝指针 */ +#define BBQ_F_COPY_VALUE 0x0001 /**< 创建队列时设置为拷贝数值 */ // 判断flags标记位 #define BBQ_F_CHK_DROP_OLD(flags) (flags & BBQ_F_DROP_OLD) @@ -28,6 +28,8 @@ printf("\x1b[31m [ERR][%s:%d:%s]" fmt "\x1b[0m\n", __func__, __LINE__, __FILE__, ##__VA_ARGS__); \ } while (0) +__thread int32_t bbq_errno; + struct bbq_status { int32_t status; // 返回状态 @@ -46,34 +48,34 @@ enum bbq_queue_state struct bbq_entry_desc { - uint64_t vsn; // allocated游标的版本(vsn) TODO:修正注释 - uint64_t off; // entry在当前block的偏移(offset) + uint64_t vsn; // allocated或reserved的版本(vsn) + uint64_t off; // entry在当前块的偏移(offset) uint32_t actual_burst; // 实际出/入队个数 - struct bbq_block * block; // 指向所在的block + struct bbq_block * block; // 指向所在的块 }; struct bbq_queue_state_s { - enum bbq_queue_state state; // 队列状态 - union { // TODO: - uint64_t vsn; // reserve_entry state==BLOCK_DONE时生效 + enum bbq_queue_state state; // 队列状态 + union { + uint64_t vsn; // bbq_reserve_entry state==BLOCK_DONE时生效 struct bbq_entry_desc e; // state为ALLOCATED、RESERVED生效 }; }; -extern inline uint64_t bbq_idx(struct bbq * q, uint64_t x) +extern inline uint64_t bbq_head_idx(struct bbq * q, uint64_t x) { return x & q->idx_mask; } -extern inline uint64_t bbq_off(struct bbq * q, uint64_t x) +extern inline uint64_t bbq_head_vsn(struct bbq * q, uint64_t x) { - return x & q->off_mask; + return x >> q->idx_bits; } -extern inline uint64_t bbq_head_vsn(struct bbq * q, uint64_t x) +extern inline uint64_t bbq_cur_off(struct bbq * q, uint64_t x) { - return x >> q->idx_bits; + return x & q->off_mask; } extern inline uint64_t bbq_cur_vsn(struct bbq * q, uint64_t x) @@ -81,18 +83,17 @@ extern inline uint64_t bbq_cur_vsn(struct bbq * q, uint64_t x) return x >> q->off_bits; } -extern inline uint64_t set_cur_vsn(struct bbq * q, uint64_t ver) +static inline uint64_t bbq_set_cur_vsn(struct bbq * q, uint64_t ver) { return ver << q->off_bits; } // 当BBQ_MEMORY宏定义开关打开,将对内存分配释放进行统计,方便排查内存泄漏 -// #define BBQ_MEMORY enum bbq_module { - BBQ_MODULE_QUEUE = 0, - BBQ_MODULE_QUEUE_BLOCK_NB, - BBQ_MODULE_QUEUE_BLOCK_ENTRY, + BBQ_MODULE_MAIN = 0, + BBQ_MODULE_BLOCK_NB, + BBQ_MODULE_BLOCK_ENTRY, BBQ_MODULE_MAX, }; @@ -154,7 +155,7 @@ static void bbq_free(enum bbq_module module, int socket_id, void * ptr, size_t s } /* 原子的比较两个值大小,并设置较大的值,成功则返回设置前的旧值 */ -uint64_t fetch_max(aotmic_uint64 * atom, uint64_t upd) +uint64_t bbq_fetch_max(aotmic_uint64 * atom, uint64_t upd) { uint64_t old_value = 0; do @@ -168,7 +169,7 @@ uint64_t fetch_max(aotmic_uint64 * atom, uint64_t upd) /* 检查参数是否为2的N次幂 */ bool bbq_check_power_of_two(uint32_t n) { - if (n <= 0) + if (n == 0) { return false; } @@ -178,7 +179,7 @@ bool bbq_check_power_of_two(uint32_t n) /* 根据entries大小返回合理的block个数 * 计算公式:log2(blocks) = max(1, ⌊log2(count)/4⌋) 注:⌊ ⌋代表向下取整。*/ -uint32_t bbq_blocks_calc(uint32_t entries) +uint32_t bbq_block_number_calc(uint32_t entries) { double log_entries = log2((double)entries); uint32_t over4 = (uint32_t)(log_entries / 4); // 向下取整 @@ -192,17 +193,17 @@ int block_init(struct bbq * q, struct bbq_block * block, bool cursor_init) { #ifdef BBQ_MEMORY // 末尾多分配一个entry(永远不应该被修改),以此检查是否存在写越界的问题 - block->entries = bbq_malloc(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id, (q->bs + 1) * q->entry_size); + block->entries = bbq_malloc(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, (q->bs + 1) * q->entry_size); char * last_entry = block->entries + q->entry_size * q->bs; memset(last_entry, BBQ_MEM_MAGIC, q->entry_size); #else - block->entries = bbq_malloc(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id, q->bs * q->entry_size); + block->entries = bbq_malloc(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, q->bs * q->entry_size); #endif if (block->entries == NULL) { - BBQ_ERR_LOG("bbq_malloc error"); - return BBQ_ALLOC_ERR; + bbq_errno = BBQ_ERR_ALLOC; + return bbq_errno; } block->committed = ATOMIC_VAR_INIT(0); @@ -218,7 +219,7 @@ int block_init(struct bbq * q, struct bbq_block * block, bool cursor_init) block->reserved = ATOMIC_VAR_INIT(q->bs); if (BBQ_F_CHK_DROP_OLD(q->flags)) { - block->consumed = ATOMIC_VAR_INIT(0); + block->consumed = ATOMIC_VAR_INIT(0); // drop old模式下用不到consumed } else { @@ -235,11 +236,9 @@ void block_destory(struct bbq * q, struct bbq_block * block) if (block->entries) { #ifdef BBQ_MEMORY - bbq_free(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id, block->entries, - sizeof(*block->entries) * (q->bs + 1) * q->entry_size); + bbq_free(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, block->entries, (q->bs + 1) * q->entry_size); #else - bbq_free(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id, block->entries, - sizeof(*block->entries) * q->bs * q->entry_size); + bbq_free(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, block->entries, q->bs * q->entry_size); #endif block->entries = NULL; } @@ -247,45 +246,51 @@ void block_destory(struct bbq * q, struct bbq_block * block) /* 求x在二进制表示中最高位1所在的位置,x参数不能为0。 -例如:x=1,return 0 (...1) -x=3,return 1 (..11) -x=9,return 3 (1..1) +例如:x=1,return 0 (...1); x=3,return 1 (..11); x=9,return 3 (1..1) */ -unsigned floor_log2(uint64_t x) +static unsigned bbq_floor_log2(uint64_t x) { - return x == 1 ? 0 : 1 + floor_log2(x >> 1); + return x == 1 ? 0 : 1 + bbq_floor_log2(x >> 1); } /* 返回以2为底x的对数,并向上取整值。 -例如:x=1,return 0 (2^0=1) -x=99, return 7(2^6=64 2^7=128) +例如:x=1,return 0 (2^0=1); x=99, return 7(2^6=64 2^7=128) */ -unsigned ceil_log2(uint64_t x) +static unsigned bbq_ceil_log2(uint64_t x) { - return x == 1 ? 0 : floor_log2(x - 1) + 1; + return x == 1 ? 0 : bbq_floor_log2(x - 1) + 1; } +// 0----------------------------------------------------------------------------- /* 创建消息队列,bn和bs必须是2的N次幂,socket_id用于多numa分配内存 */ -static struct bbq * __bbq_create_bnbs(uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags) +static struct bbq * __bbq_create_bnbs(const char * name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, + uint32_t flags) { int ret = 0; + bbq_errno = BBQ_OK; if (bbq_check_power_of_two(bn) == false) { - BBQ_ERR_LOG("block number is not power of two, now is :%u", bn); + bbq_errno = BBQ_ERR_POWER_OF_TWO; return NULL; } if (bbq_check_power_of_two(bs) == false) { - BBQ_ERR_LOG("block size is not power of two, now is :%u", bs); + bbq_errno = BBQ_ERR_POWER_OF_TWO; return NULL; } - if (obj_size == 0) + if (name == NULL) { - BBQ_ERR_LOG("obj_size is 0"); + bbq_errno = BBQ_ERR_INPUT_NULL; + return NULL; + } + + if (strlen(name) >= BBQ_SYMBOL_MAX - 1 || obj_size == 0) + { + bbq_errno = BBQ_ERR_OUT_OF_RANGE; return NULL; } @@ -295,14 +300,14 @@ static struct bbq * __bbq_create_bnbs(uint32_t bn, uint32_t bs, size_t obj_size, socket_id = BBQ_SOCKET_ID_ANY; } - struct bbq * q = bbq_malloc(BBQ_MODULE_QUEUE, socket_id, sizeof(*q)); + struct bbq * q = bbq_malloc(BBQ_MODULE_MAIN, socket_id, sizeof(*q)); if (q == NULL) { - BBQ_ERR_LOG("malloc for bbq queue error"); + bbq_errno = BBQ_ERR_ALLOC; return NULL; } memset(q, 0, sizeof(*q)); - + ret = snprintf(q->name, sizeof(q->name), "%s", name); q->bn = bn; q->bs = bs; q->entry_size = obj_size; @@ -311,10 +316,10 @@ static struct bbq * __bbq_create_bnbs(uint32_t bn, uint32_t bs, size_t obj_size, q->chead = ATOMIC_VAR_INIT(0); q->flags = flags; - q->blocks = bbq_malloc(BBQ_MODULE_QUEUE_BLOCK_NB, socket_id, bn * sizeof(*q->blocks)); + q->blocks = bbq_malloc(BBQ_MODULE_BLOCK_NB, socket_id, bn * sizeof(*q->blocks)); if (q->blocks == NULL) { - BBQ_ERR_LOG("bbq malloc for blocks error"); + bbq_errno = BBQ_ERR_ALLOC; goto error; } memset(q->blocks, 0, sizeof(*q->blocks)); @@ -326,13 +331,12 @@ static struct bbq * __bbq_create_bnbs(uint32_t bn, uint32_t bs, size_t obj_size, ret = block_init(q, &(q->blocks[i]), cursor_init); if (ret != BBQ_OK) { - BBQ_ERR_LOG("bbq block init error"); goto error; } } - q->idx_bits = ceil_log2(bn); - q->off_bits = ceil_log2(bs) + 1; // 多线程同时add,可能超过bs的问题,因此多分配一位 + q->idx_bits = bbq_ceil_log2(bn); + q->off_bits = bbq_ceil_log2(bs) + 1; // 多线程同时add,可能超过bs的问题,因此多分配一位 q->idx_mask = (1 << q->idx_bits) - 1; q->off_mask = (1 << q->off_bits) - 1; @@ -344,44 +348,59 @@ error: return NULL; } -struct bbq * bbq_create_bnbs(uint32_t bn, uint32_t bs, int socket_id, uint32_t flags) +struct bbq * bbq_create_bnbs(const char * name, uint32_t bn, uint32_t bs, int socket_id, uint32_t flags) { - return __bbq_create_bnbs(bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR); + return __bbq_create_bnbs(name, bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR); } -struct bbq * bbq_create_bnbs_elem(uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags) +struct bbq * bbq_create_bnbs_elem(const char * name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, + uint32_t flags) { - return __bbq_create_bnbs(bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE); + return __bbq_create_bnbs(name, bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE); } /* 创建消息队列,count必须大于1,且是2的N次幂,bn和bs将根据count值自动计算,socket_id用于多numa分配内存,free_func先设置NULL */ -struct bbq * bbq_create_elem(uint32_t count, size_t obj_size, int socket_id, uint32_t flags) +struct bbq * bbq_create_elem(const char * name, uint32_t count, size_t obj_size, int socket_id, uint32_t flags) { - if (bbq_check_power_of_two(count) == false || count == 1) + bbq_errno = BBQ_OK; + if (count <= 1) + { + bbq_errno = BBQ_ERR_OUT_OF_RANGE; + return NULL; + } + + if (bbq_check_power_of_two(count) == false) { - BBQ_ERR_LOG("bbq entries number must be power of two and greater than 1, now is :%u", count); + bbq_errno = BBQ_ERR_POWER_OF_TWO; return NULL; } - uint32_t bn = bbq_blocks_calc(count); + uint32_t bn = bbq_block_number_calc(count); uint32_t bs = count / bn; - return bbq_create_bnbs_elem(bn, bs, obj_size, socket_id, flags); + return bbq_create_bnbs_elem(name, bn, bs, obj_size, socket_id, flags); } -struct bbq * bbq_create(uint32_t count, int socket_id, uint32_t flags) +struct bbq * bbq_create(const char * name, uint32_t count, int socket_id, uint32_t flags) { - if (bbq_check_power_of_two(count) == false || count == 1) + bbq_errno = BBQ_OK; + if (count <= 1) + { + bbq_errno = BBQ_ERR_OUT_OF_RANGE; + return NULL; + } + + if (bbq_check_power_of_two(count) == false) { - BBQ_ERR_LOG("bbq entries number must be power of two and greater than 1, now is :%u", count); + bbq_errno = BBQ_ERR_POWER_OF_TWO; return NULL; } - uint32_t bn = bbq_blocks_calc(count); + uint32_t bn = bbq_block_number_calc(count); uint32_t bs = count / bn; - return bbq_create_bnbs(bn, bs, socket_id, flags); + return bbq_create_bnbs(name, bn, bs, socket_id, flags); } /* 释放消息队列,与bbq_ring_create系列接口成对*/ @@ -397,8 +416,8 @@ void bbq_destory(struct bbq * q) block_destory(q, &(q->blocks[i])); } - bbq_free(BBQ_MODULE_QUEUE_BLOCK_NB, q->socket_id, q->blocks, q->bn * sizeof(*q->blocks)); - bbq_free(BBQ_MODULE_QUEUE, q->socket_id, q, sizeof(*q)); + bbq_free(BBQ_MODULE_BLOCK_NB, q->socket_id, q->blocks, q->bn * sizeof(*q->blocks)); + bbq_free(BBQ_MODULE_MAIN, q->socket_id, q, sizeof(*q)); } #define BBQ_DATA_TYPE_SINGLE 0x0 @@ -454,7 +473,7 @@ void commit_entry(struct bbq * q, struct bbq_entry_desc * e, void const * data, struct bbq_queue_state_s allocate_entry(struct bbq * q, struct bbq_block * block, uint32_t n) { struct bbq_queue_state_s state = {0}; - if (bbq_off(q, atomic_load(&block->allocated)) >= q->bs) + if (bbq_cur_off(q, atomic_load(&block->allocated)) >= q->bs) { state.state = BBQ_BLOCK_DONE; return state; @@ -465,7 +484,7 @@ struct bbq_queue_state_s allocate_entry(struct bbq * q, struct bbq_block * block // committed_vsn在当前块被初始化后值是不变的,通过比较vsn值,来判断allocated的off是否溢出了,导致vsn+1 uint64_t cur_vsn = bbq_cur_vsn(q, old); - uint64_t cur_off = bbq_off(q, old); + uint64_t cur_off = bbq_cur_off(q, old); if ((cur_vsn != committed_vsn) || (cur_off >= q->bs)) { state.state = BBQ_BLOCK_DONE; @@ -494,14 +513,14 @@ enum bbq_queue_state advance_phead(struct bbq * q, uint64_t ph) { // 获取下一个block uint64_t cur = 0; - struct bbq_block * n_blk = &(q->blocks[(bbq_idx(q, ph) + 1) & q->idx_mask]); + struct bbq_block * n_blk = &(q->blocks[(bbq_head_idx(q, ph) + 1) & q->idx_mask]); uint64_t ph_vsn = bbq_head_vsn(q, ph); if (BBQ_F_CHK_DROP_OLD(q->flags)) { cur = atomic_load(&n_blk->committed); // 生产者避免前进到上一轮中尚未完全提交的区块 - if (bbq_cur_vsn(q, cur) == ph_vsn && bbq_off(q, cur) != q->bs) + if (bbq_cur_vsn(q, cur) == ph_vsn && bbq_cur_off(q, cur) != q->bs) { return BBQ_NOT_AVAILABLE; } @@ -510,14 +529,14 @@ enum bbq_queue_state advance_phead(struct bbq * q, uint64_t ph) { cur = atomic_load(&n_blk->consumed); uint64_t reserved; - uint64_t consumed_off = bbq_off(q, cur); + uint64_t consumed_off = bbq_cur_off(q, cur); uint64_t consumed_vsn = bbq_cur_vsn(q, cur); if (consumed_vsn < ph_vsn || // 生产者赶上了消费者 (consumed_vsn == ph_vsn && consumed_off != q->bs)) { reserved = atomic_load(&n_blk->reserved); - if (bbq_off(q, reserved) == consumed_off) + if (bbq_cur_off(q, reserved) == consumed_off) { return BBQ_NO_ENTRY; } @@ -529,54 +548,80 @@ enum bbq_queue_state advance_phead(struct bbq * q, uint64_t ph) } // 用head的version初始化下一个块,version在高位,version+1,idex/offset清零,如果没有被其他线程执行过,数值会高于旧值。多线程同时只更新一次。 - uint64_t new_vsn = set_cur_vsn(q, ph_vsn + 1); - fetch_max(&n_blk->committed, new_vsn); - fetch_max(&n_blk->allocated, new_vsn); + uint64_t new_vsn = bbq_set_cur_vsn(q, ph_vsn + 1); + bbq_fetch_max(&n_blk->committed, new_vsn); + bbq_fetch_max(&n_blk->allocated, new_vsn); // 索引+1,当超过索引范围,也就是循环下一轮块时,version+1 - fetch_max(&q->phead, ph + 1); + bbq_fetch_max(&q->phead, ph + 1); return BBQ_SUCCESS; } -static uint32_t bbq_free_space_set(struct bbq * q, int32_t ret_status, uint64_t ph, struct bbq_block * blk_ph) +static uint32_t bbq_wait_consumed_set(struct bbq * q, uint64_t * ch_ptr, uint64_t * ph_ptr, struct bbq_block * blk_ph) { - if (ret_status == BBQ_QUEUE_FULL) + uint64_t ch; + uint64_t ph; + if (ch_ptr != NULL) { - return 0; + ch = *ch_ptr; + } + else + { + ch = atomic_load(&q->chead); } - uint64_t ch = atomic_load(&q->chead); - struct bbq_block * blk_ch = &(q->blocks[bbq_idx(q, ch)]); + if (ph_ptr != NULL) + { + ph = *ph_ptr; + } + else + { + ph = atomic_load(&q->phead); + } - // uint64_t ph_vsn = bbq_head_vsn(q, ph); - // uint64_t ch_vsn = bbq_head_vsn(q, ch); - uint64_t ph_idx = bbq_idx(q, ph); - uint64_t ch_idx = bbq_idx(q, ch); - uint64_t committed_off = bbq_off(q, atomic_load(&blk_ph->committed)); - uint64_t reserved_off = bbq_off(q, atomic_load(&blk_ch->reserved)); + uint64_t ph_idx = bbq_head_idx(q, ph); + uint64_t ch_idx = bbq_head_idx(q, ch); + uint64_t committed_off = bbq_cur_off(q, atomic_load(&blk_ph->committed)); - if (BBQ_F_CHK_DROP_OLD(q->flags)) + struct bbq_block * blk_ch = &(q->blocks[bbq_head_idx(q, ch)]); + uint64_t reserved_off = bbq_cur_off(q, atomic_load(&blk_ch->reserved)); + + // "生产者"超过"消费者"的索引(即块)的个数 + uint64_t idx_diff = ph_idx >= ch_idx ? ph_idx - ch_idx : q->bn - ch_idx + ph_idx; + if (!BBQ_F_CHK_DROP_OLD(q->flags)) { - // TODO - return 0; + // 这里idx_diff-1=-1也是正确。 + return (idx_diff - 1) * q->bs + (q->bs - reserved_off + committed_off); } - else + + uint64_t ch_vsn = bbq_head_vsn(q, ch); + uint64_t ph_vsn = bbq_head_vsn(q, ph); + + if (ph_vsn == ch_vsn || (ph_vsn == (ch_vsn + 1) && (ph_idx < ch_idx))) { - // 生产者到消费者的距离 - uint64_t idx_diff = ph_idx >= ch_idx ? q->bn - ph_idx + ch_idx : ch_idx - ph_idx; - return (idx_diff - 1) * q->bs + (q->bs - committed_off + reserved_off); + return (idx_diff - 1) * q->bs + (q->bs - reserved_off + committed_off); } + + // 发生了覆盖 + if (ph_idx == ch_idx) + { + // 当前块以及之前已生产的都作废 + return 0; + } + + return (idx_diff - 1) * q->bs + committed_off; } /* 消息队列入队 */ static struct bbq_status __bbq_enqueue(struct bbq * q, void const * data, uint32_t n, uint32_t flag, - uint32_t * free_space) + uint32_t * wait_consumed) { struct bbq_status ret = {.status = 0, .actual_burst = 0}; if (q == NULL || data == NULL) { - ret.status = BBQ_NULL_PTR; + bbq_errno = BBQ_ERR_INPUT_NULL; + ret.status = bbq_errno; return ret; } @@ -584,7 +629,7 @@ static struct bbq_status __bbq_enqueue(struct bbq * q, void const * data, uint32 { // 获取当前phead,转为索引后获取到当前的blk uint64_t ph = atomic_load(&q->phead); - struct bbq_block * blk = &(q->blocks[bbq_idx(q, ph)]); + struct bbq_block * blk = &(q->blocks[bbq_head_idx(q, ph)]); struct bbq_queue_state_s state = allocate_entry(q, blk, n); switch (state.state) @@ -603,27 +648,31 @@ static struct bbq_status __bbq_enqueue(struct bbq * q, void const * data, uint32 if (pstate == BBQ_NO_ENTRY) { - ret.status = BBQ_QUEUE_FULL; + bbq_errno = BBQ_ERR_FULL; + ret.status = bbq_errno; } else if (pstate == BBQ_NOT_AVAILABLE) { - ret.status = BBQ_QUEUE_BUSY; + bbq_errno = BBQ_ERR_BUSY; + ret.status = bbq_errno; } else { - ret.status = BBQ_ERROR; + bbq_errno = BBQ_ERR; + ret.status = bbq_errno; } break; } default: - ret.status = BBQ_ERROR; + bbq_errno = BBQ_ERR; + ret.status = bbq_errno; break; } - if (free_space != NULL) + if (wait_consumed != NULL) { - *free_space = bbq_free_space_set(q, ret.status, ph, blk); + *wait_consumed = bbq_wait_consumed_set(q, NULL, &ph, blk); } return ret; @@ -632,24 +681,26 @@ static struct bbq_status __bbq_enqueue(struct bbq * q, void const * data, uint32 int bbq_enqueue(struct bbq * q, void * const * data) { + bbq_errno = BBQ_OK; struct bbq_status ret = __bbq_enqueue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL); return ret.status; } int bbq_enqueue_elem(struct bbq * q, void const * data) { + bbq_errno = BBQ_OK; struct bbq_status ret = __bbq_enqueue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL); return ret.status; } /* 更新成功 reserve成功的个数 */ -uint32_t reserve_update(bbq_cursor * aotmic, uint64_t reserved, uint32_t n) +uint32_t bbq_reserve_update(bbq_cursor * aotmic, uint64_t reserved, uint32_t n) { // TODO:逻辑可以合并 if (n == 1) { // fetch_max返回的是旧值,如果旧值等于局部变量reserved,则代表数据由当前线程更新 - if (fetch_max(aotmic, reserved + 1) == reserved) + if (bbq_fetch_max(aotmic, reserved + 1) == reserved) { return 1; } @@ -663,13 +714,13 @@ uint32_t reserve_update(bbq_cursor * aotmic, uint64_t reserved, uint32_t n) } } -struct bbq_queue_state_s reserve_entry(struct bbq * q, struct bbq_block * block, uint32_t n) +struct bbq_queue_state_s bbq_reserve_entry(struct bbq * q, struct bbq_block * block, uint32_t n) { while (true) { struct bbq_queue_state_s state; uint64_t reserved = atomic_load(&block->reserved); - uint64_t reserved_off = bbq_off(q, reserved); + uint64_t reserved_off = bbq_cur_off(q, reserved); uint64_t reserved_svn = bbq_cur_vsn(q, reserved); if (reserved_off < q->bs) @@ -686,7 +737,7 @@ struct bbq_queue_state_s reserve_entry(struct bbq * q, struct bbq_block * block, } uint64_t committed = atomic_load(&block->committed); - uint64_t committed_off = bbq_off(q, committed); + uint64_t committed_off = bbq_cur_off(q, committed); if (committed_off == reserved_off) { state.state = BBQ_NO_ENTRY; @@ -697,7 +748,7 @@ struct bbq_queue_state_s reserve_entry(struct bbq * q, struct bbq_block * block, if (committed_off != q->bs) { uint64_t allocated = atomic_load(&block->allocated); - if (bbq_off(q, allocated) != committed_off) + if (bbq_cur_off(q, allocated) != committed_off) { state.state = BBQ_NOT_AVAILABLE; return state; @@ -705,7 +756,7 @@ struct bbq_queue_state_s reserve_entry(struct bbq * q, struct bbq_block * block, } uint32_t tmp = committed_off - reserved_off; - uint32_t reserved_cnt = reserve_update(&block->reserved, reserved, tmp < n ? tmp : n); + uint32_t reserved_cnt = bbq_reserve_update(&block->reserved, reserved, tmp < n ? tmp : n); if (reserved_cnt > 0) { // TODO:多entry时关注 state.state = BBQ_RESERVED; @@ -791,7 +842,7 @@ bool consume_entry(struct bbq * q, struct bbq_entry_desc * e, void * deq_data, u bool advance_chead(struct bbq * q, uint64_t ch, uint64_t ver) { - uint64_t ch_idx = bbq_idx(q, ch); + uint64_t ch_idx = bbq_head_idx(q, ch); struct bbq_block * n_blk = &(q->blocks[(ch_idx + 1) & q->idx_mask]); uint64_t ch_vsn = bbq_head_vsn(q, ch); @@ -803,7 +854,7 @@ bool advance_chead(struct bbq * q, uint64_t ch, uint64_t ver) // 第一个块是一个特殊情况,因为与其他块相比,它的版本总是相差一个。因此,如果 ch_idx == 0,我们在比较中加 1 if (committed_vsn < ver + (ch_idx == 0)) return false; - fetch_max(&n_blk->reserved, set_cur_vsn(q, committed_vsn)); + bbq_fetch_max(&n_blk->reserved, bbq_set_cur_vsn(q, committed_vsn)); } else { @@ -812,82 +863,90 @@ bool advance_chead(struct bbq * q, uint64_t ch, uint64_t ver) // 消费者追上了生产者,下一块还未开始生产 return false; } - uint64_t new_vsn = set_cur_vsn(q, ch_vsn + 1); - fetch_max(&n_blk->consumed, new_vsn); - fetch_max(&n_blk->reserved, new_vsn); + uint64_t new_vsn = bbq_set_cur_vsn(q, ch_vsn + 1); + bbq_fetch_max(&n_blk->consumed, new_vsn); + bbq_fetch_max(&n_blk->reserved, new_vsn); } - fetch_max(&q->chead, ch + 1); + bbq_fetch_max(&q->chead, ch + 1); return true; } /* 消息队列出队 */ -static struct bbq_status __bbq_dequeue(struct bbq * q, void * deq_data, uint32_t n, uint32_t data_type) +static struct bbq_status __bbq_dequeue(struct bbq * q, void * deq_data, uint32_t n, uint32_t data_type, + uint32_t * wait_consumed) { struct bbq_status ret = {.status = 0, .actual_burst = 0}; if (q == NULL || deq_data == NULL) { - ret.status = BBQ_NULL_PTR; + bbq_errno = BBQ_ERR_INPUT_NULL; + ret.status = bbq_errno; return ret; } while (true) { uint64_t ch = atomic_load(&q->chead); - struct bbq_block * blk = &(q->blocks[bbq_idx(q, ch)]); - + struct bbq_block * blk = &(q->blocks[bbq_head_idx(q, ch)]); struct bbq_queue_state_s state; - state = reserve_entry(q, blk, n); + state = bbq_reserve_entry(q, blk, n); switch (state.state) { case BBQ_RESERVED: - if (consume_entry(q, &state.e, deq_data, data_type)) - { - ret.status = BBQ_OK; - ret.actual_burst = state.e.actual_burst; - return ret; - } - else + if (!consume_entry(q, &state.e, deq_data, data_type)) { continue; } + ret.status = BBQ_OK; + ret.actual_burst = state.e.actual_burst; + break; case BBQ_NO_ENTRY: - ret.status = BBQ_QUEUE_EMPTY; - return ret; + bbq_errno = BBQ_ERR_EMPTY; + ret.status = bbq_errno; + break; case BBQ_NOT_AVAILABLE: - ret.status = BBQ_QUEUE_BUSY; - return ret; + bbq_errno = BBQ_ERR_BUSY; + ret.status = bbq_errno; + break; case BBQ_BLOCK_DONE: if (advance_chead(q, ch, state.vsn)) { continue; } - else - { - ret.status = BBQ_QUEUE_EMPTY; - return ret; - } + bbq_errno = BBQ_ERR_EMPTY; + ret.status = bbq_errno; + break; default: - ret.status = BBQ_ERROR; - return ret; + bbq_errno = BBQ_ERR; + ret.status = bbq_errno; + break; + } + + if (wait_consumed != NULL) + { + *wait_consumed = bbq_wait_consumed_set(q, &ch, NULL, blk); } + + return ret; } } int bbq_dequeue(struct bbq * q, void ** data) { - struct bbq_status ret = __bbq_dequeue(q, data, 1, BBQ_DATA_TYPE_SINGLE); + bbq_errno = BBQ_OK; + struct bbq_status ret = __bbq_dequeue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL); return ret.status; } int bbq_dequeue_elem(struct bbq * q, void * data) { - struct bbq_status ret = __bbq_dequeue(q, data, 1, BBQ_DATA_TYPE_SINGLE); + bbq_errno = BBQ_OK; + struct bbq_status ret = __bbq_dequeue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL); return ret.status; } -uint32_t bbq_max_burst(struct bbq * q, uint32_t n) +static uint32_t bbq_max_burst(struct bbq * q, uint32_t n) { uint32_t burst = n; if (burst > q->bs) @@ -898,16 +957,19 @@ uint32_t bbq_max_burst(struct bbq * q, uint32_t n) return burst; } -static uint32_t bbq_dequeue_burst_one_dimensional(struct bbq * q, void * obj_table, uint32_t n) +static uint32_t bbq_dequeue_burst_one_dimensional(struct bbq * q, void * obj_table, uint32_t n, + uint32_t * wait_consumed) { if (q == NULL || obj_table == NULL) { - return BBQ_NULL_PTR; + bbq_errno = BBQ_ERR_INPUT_NULL; + return 0; } if (!BBQ_F_CHK_VALUE(q->flags)) { - return BBQ_QUEUE_DATA_ERR; + bbq_errno = BBQ_ERR_NOT_SUPPORT; + return 0; } uint32_t burst = 0; @@ -918,7 +980,7 @@ static uint32_t bbq_dequeue_burst_one_dimensional(struct bbq * q, void * obj_tab while (ready < n) { burst = bbq_max_burst(q, n - ready); - ret = __bbq_dequeue(q, obj, burst, BBQ_DATA_TYPE_ARRAY_1D); + ret = __bbq_dequeue(q, obj, burst, BBQ_DATA_TYPE_ARRAY_1D, wait_consumed); if (ret.status != BBQ_OK) { break; @@ -930,11 +992,13 @@ static uint32_t bbq_dequeue_burst_one_dimensional(struct bbq * q, void * obj_tab return ready; } -static uint32_t bbq_dequeue_burst_two_dimensional(struct bbq * q, void ** obj_table, uint32_t n) +static uint32_t bbq_dequeue_burst_two_dimensional(struct bbq * q, void ** obj_table, uint32_t n, + uint32_t * wait_consumed) { if (q == NULL || obj_table == NULL) { - return BBQ_NULL_PTR; + bbq_errno = BBQ_ERR_INPUT_NULL; + return 0; } uint32_t burst = 0; @@ -945,7 +1009,7 @@ static uint32_t bbq_dequeue_burst_two_dimensional(struct bbq * q, void ** obj_ta while (ready < n) { burst = bbq_max_burst(q, n - ready); - ret = __bbq_dequeue(q, obj_table_tmp, burst, BBQ_DATA_TYPE_ARRAY_2D); + ret = __bbq_dequeue(q, obj_table_tmp, burst, BBQ_DATA_TYPE_ARRAY_2D, wait_consumed); if (ret.status != BBQ_OK) { break; @@ -958,16 +1022,19 @@ static uint32_t bbq_dequeue_burst_two_dimensional(struct bbq * q, void ** obj_ta } /* 尝试一次入队多个数据,直到达到最大数量,或是入队失败 */ -uint32_t bbq_enqueue_burst_one_dimensional(struct bbq * q, void const * obj_table, uint32_t n) +static uint32_t bbq_enqueue_burst_one_dimensional(struct bbq * q, void const * obj_table, uint32_t n, + uint32_t * wait_consumed) { if (q == NULL || obj_table == NULL) { - return BBQ_NULL_PTR; + bbq_errno = BBQ_ERR_INPUT_NULL; + return 0; } if (!BBQ_F_CHK_VALUE(q->flags)) { - return BBQ_QUEUE_DATA_ERR; + bbq_errno = BBQ_ERR_NOT_SUPPORT; + return 0; } uint32_t burst = 0; @@ -978,7 +1045,7 @@ uint32_t bbq_enqueue_burst_one_dimensional(struct bbq * q, void const * obj_tabl while (ready < n) { burst = bbq_max_burst(q, n - ready); - ret = __bbq_enqueue(q, obj, burst, BBQ_DATA_TYPE_ARRAY_1D, NULL); + ret = __bbq_enqueue(q, obj, burst, BBQ_DATA_TYPE_ARRAY_1D, wait_consumed); if (ret.status != BBQ_OK) { break; @@ -991,11 +1058,13 @@ uint32_t bbq_enqueue_burst_one_dimensional(struct bbq * q, void const * obj_tabl } /* 尝试一次入队多个数据,直到达到最大数量,或是入队失败 */ -uint32_t bbq_enqueue_burst_two_dimensional(struct bbq * q, void * const * obj_table, uint32_t n, uint32_t * free_space) +static uint32_t bbq_enqueue_burst_two_dimensional(struct bbq * q, void * const * obj_table, uint32_t n, + uint32_t * wait_consumed) { if (q == NULL || obj_table == NULL) { - return BBQ_NULL_PTR; + bbq_errno = BBQ_ERR_INPUT_NULL; + return 0; } uint32_t burst = 0; @@ -1006,7 +1075,7 @@ uint32_t bbq_enqueue_burst_two_dimensional(struct bbq * q, void * const * obj_ta while (ready < n) { burst = bbq_max_burst(q, n - ready); - ret = __bbq_enqueue(q, obj_table_tmp, burst, BBQ_DATA_TYPE_ARRAY_2D, free_space); + ret = __bbq_enqueue(q, obj_table_tmp, burst, BBQ_DATA_TYPE_ARRAY_2D, wait_consumed); if (ret.status != BBQ_OK) { break; @@ -1025,8 +1094,8 @@ bool bbq_empty(struct bbq * q) uint64_t ph_vsn = bbq_head_vsn(q, phead); uint64_t ch_vsn = bbq_head_vsn(q, chead); - uint64_t ph_idx = bbq_idx(q, phead); - uint64_t ch_idx = bbq_idx(q, chead); + uint64_t ph_idx = bbq_head_idx(q, phead); + uint64_t ch_idx = bbq_head_idx(q, chead); struct bbq_block * block; @@ -1038,14 +1107,14 @@ bool bbq_empty(struct bbq * q) block = &q->blocks[ph_idx]; if (ph_vsn == ch_vsn) { - if (bbq_off(q, atomic_load(&block->reserved)) == bbq_off(q, atomic_load(&block->committed))) + if (bbq_cur_off(q, atomic_load(&block->reserved)) == bbq_cur_off(q, atomic_load(&block->committed))) { return true; } } bbq_cursor reserved = atomic_load(&block->reserved); - uint64_t reserved_off = bbq_off(q, reserved); + uint64_t reserved_off = bbq_cur_off(q, reserved); if (BBQ_F_CHK_DROP_OLD(q->flags) && ph_vsn > ch_vsn && reserved_off != q->bs) { @@ -1057,29 +1126,35 @@ bool bbq_empty(struct bbq * q) return false; } -uint32_t bbq_enqueue_burst_elem(struct bbq * q, void const * obj_table, uint32_t n) +uint32_t bbq_enqueue_burst_elem(struct bbq * q, void const * obj_table, uint32_t n, uint32_t * wait_consumed) { - return bbq_enqueue_burst_one_dimensional(q, obj_table, n); + bbq_errno = BBQ_OK; + return bbq_enqueue_burst_one_dimensional(q, obj_table, n, wait_consumed); } -uint32_t bbq_enqueue_burst_elem_two_dimensional(struct bbq * q, void * const * obj_table, uint32_t n) +uint32_t bbq_enqueue_burst_elem_two_dimensional(struct bbq * q, void * const * obj_table, uint32_t n, + uint32_t * wait_consumed) { - return bbq_enqueue_burst_two_dimensional(q, obj_table, n, NULL); + bbq_errno = BBQ_OK; + return bbq_enqueue_burst_two_dimensional(q, obj_table, n, wait_consumed); } -uint32_t bbq_enqueue_burst(struct bbq * q, void * const * obj_table, uint32_t n, uint32_t * free_space) +uint32_t bbq_enqueue_burst(struct bbq * q, void * const * obj_table, uint32_t n, uint32_t * wait_consumed) { - return bbq_enqueue_burst_two_dimensional(q, obj_table, n, free_space); + bbq_errno = BBQ_OK; + return bbq_enqueue_burst_two_dimensional(q, obj_table, n, wait_consumed); } -uint32_t bbq_dequeue_burst(struct bbq * q, void ** obj_table, uint32_t n) +uint32_t bbq_dequeue_burst(struct bbq * q, void ** obj_table, uint32_t n, uint32_t * wait_consumed) { - return bbq_dequeue_burst_two_dimensional(q, obj_table, n); + bbq_errno = BBQ_OK; + return bbq_dequeue_burst_two_dimensional(q, obj_table, n, wait_consumed); } -uint32_t bbq_dequeue_burst_elem(struct bbq * q, void * obj_table, uint32_t n) +uint32_t bbq_dequeue_burst_elem(struct bbq * q, void * obj_table, uint32_t n, uint32_t * wait_consumed) { - return bbq_dequeue_burst_one_dimensional(q, obj_table, n); + bbq_errno = BBQ_OK; + return bbq_dequeue_burst_one_dimensional(q, obj_table, n, wait_consumed); } bool bbq_malloc_free_equal() @@ -1165,15 +1240,15 @@ void bbq_debug_block_print(struct bbq * q, struct bbq_block * block) bbq_cursor committed = atomic_load(&block->committed); bbq_cursor reserved = atomic_load(&block->reserved); bbq_cursor consumed = atomic_load(&block->consumed); - printf(" allocated:%lu committed:%lu reserved:%lu", bbq_off(q, allocated), bbq_off(q, committed), - bbq_off(q, reserved)); + printf(" allocated:%lu committed:%lu reserved:%lu", bbq_cur_off(q, allocated), bbq_cur_off(q, committed), + bbq_cur_off(q, reserved)); if (BBQ_F_CHK_DROP_OLD(q->flags)) { printf("\n"); } else { - printf(" consumed:%lu\n", bbq_off(q, consumed)); + printf(" consumed:%lu\n", bbq_cur_off(q, consumed)); } } @@ -1183,18 +1258,18 @@ void bbq_debug_struct_print(struct bbq * q) uint64_t phead = atomic_load(&q->phead); uint64_t chead = atomic_load(&q->chead); - printf("block number:%lu block size:%lu total entries:%lu\n", q->bn, q->bs, q->bn * q->bs); - printf("producer header idx:%lu vsn:%lu\n", bbq_idx(q, phead), bbq_head_vsn(q, phead)); + printf("block number:%u block size:%u total entries:%u\n", q->bn, q->bs, q->bn * q->bs); + printf("producer header idx:%lu vsn:%lu\n", bbq_head_idx(q, phead), bbq_head_vsn(q, phead)); - uint64_t ph_idx = bbq_idx(q, phead); - uint64_t ch_idx = bbq_idx(q, chead); + uint64_t ph_idx = bbq_head_idx(q, phead); + uint64_t ch_idx = bbq_head_idx(q, chead); if (ph_idx != ch_idx) { printf("block[%lu]\n", ph_idx); bbq_debug_block_print(q, &(q->blocks[ph_idx])); } - printf("consumer header idx:%lu vsn:%lu\n", bbq_idx(q, chead), bbq_head_vsn(q, chead)); + printf("consumer header idx:%lu vsn:%lu\n", bbq_head_idx(q, chead), bbq_head_vsn(q, chead)); printf("block[%lu]\n", ch_idx); bbq_debug_block_print(q, &(q->blocks[ch_idx])); } \ No newline at end of file diff --git a/infra/src/vnode_common.c b/infra/src/vnode_common.c index 489b708..b1643f9 100644 --- a/infra/src/vnode_common.c +++ b/infra/src/vnode_common.c @@ -47,8 +47,8 @@ static struct tunnel_desc * tunnel_new(const char * symbol, unsigned int sz_excl struct tunnel_desc * desc = ZMALLOC(sizeof(struct tunnel_desc)); MR_VERIFY_MALLOC(desc); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) - desc->tunnel_object = bbq_create(sz_exclusive + sz_shared, BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); +#ifdef BBQ_SPSC + desc->tunnel_object = bbq_create(symbol, sz_exclusive + sz_shared, BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); #else desc->tunnel_object = rte_ring_create(symbol, sz_exclusive + sz_shared, SOCKET_ID_ANY, RING_F_SC_DEQ | RING_F_SP_ENQ); @@ -81,7 +81,7 @@ static struct tunnel_desc * tunnel_new(const char * symbol, unsigned int sz_excl errout: if (desc->tunnel_object != NULL) { -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC bbq_destory(desc->tunnel_object); #else rte_ring_free(desc->tunnel_object); @@ -112,7 +112,7 @@ static int tunnel_delete(struct tunnel_desc * desc) } struct rte_mbuf * mbuf; -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC while (bbq_dequeue(desc->tunnel_object, (void **)&mbuf) == 0) #else while (rte_ring_dequeue(desc->tunnel_object, (void **)&mbuf) == 0) @@ -128,7 +128,7 @@ static int tunnel_delete(struct tunnel_desc * desc) infra_rte_pktmbuf_free(desc->rt_buffer[i]); } -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC MR_VERIFY_2(bbq_empty(desc->tunnel_object) == 1, "Tunnel %s is not empty", desc->symbol); #else MR_VERIFY_2(rte_ring_empty(desc->tunnel_object) == 1, "Tunnel %s is not empty", desc->symbol); @@ -136,7 +136,7 @@ static int tunnel_delete(struct tunnel_desc * desc) rte_free(desc->en_buffer); rte_free(desc->rt_buffer); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC bbq_destory(desc->tunnel_object); #else rte_ring_free(desc->tunnel_object); diff --git a/infra/src/vnode_common.h b/infra/src/vnode_common.h index ae630a0..0fd1334 100644 --- a/infra/src/vnode_common.h +++ b/infra/src/vnode_common.h @@ -26,7 +26,7 @@ struct tunnel_desc /* Tunnel Name */ char symbol[MR_SYMBOL_MAX]; -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC struct bbq * tunnel_object; #else /* Tunel Object, real object to hold pointers */ diff --git a/infra/src/vnode_mirror.c b/infra/src/vnode_mirror.c index 78afc90..9c0261e 100644 --- a/infra/src/vnode_mirror.c +++ b/infra/src/vnode_mirror.c @@ -117,7 +117,11 @@ static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons rte_cldemote(RTE_PTR_ADD(mbuf, RTE_CACHE_LINE_SIZE * 2)); } +#ifdef BBQ_SPSC + uint32_t wait_consumed = 0; +#else unsigned int n_free_space = 0; +#endif unsigned int n_send = 0; bool is_acquire_credit_success = dist_tunnel_acquire_credits(prod->vnode, desc, (int32_t)n_to_send); @@ -125,8 +129,8 @@ static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons /* acquire credit, if failed, drop all the packets */ if (likely(is_acquire_credit_success)) { -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) - n_send = bbq_enqueue_burst(desc->tunnel_object, (void **)desc->en_buffer, n_to_send, &n_free_space); +#ifdef BBQ_SPSC + n_send = bbq_enqueue_burst(desc->tunnel_object, (void **)desc->en_buffer, n_to_send, &wait_consumed); #else n_send = rte_ring_sp_enqueue_burst(desc->tunnel_object, (void **)desc->en_buffer, n_to_send, &n_free_space); #endif @@ -174,7 +178,11 @@ static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons desc->total_len += n_send_len; /* q_len */ +#ifdef BBQ_SPSC + desc->q_len = wait_consumed; +#else desc->q_len = desc->tunnel_size - n_free_space; +#endif desc->q_len_avg += 0.2F * ((float)desc->q_len - desc->q_len_avg); // clear the buffer @@ -223,8 +231,8 @@ out: static inline int dist_tunnel_dequeue(struct vnode * vnode_desc, struct tunnel_desc * tunnel_desc, void * obj, unsigned int nr_max_obj) { -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) - unsigned int nr_deq = bbq_dequeue_burst(tunnel_desc->tunnel_object, obj, nr_max_obj); +#ifdef BBQ_SPSC + unsigned int nr_deq = bbq_dequeue_burst(tunnel_desc->tunnel_object, obj, nr_max_obj, NULL); #else unsigned int nr_deq = rte_ring_sc_dequeue_burst(tunnel_desc->tunnel_object, obj, nr_max_obj, NULL); #endif diff --git a/infra/test/TestVNode.cc b/infra/test/TestVNode.cc index 7101e71..fae95a8 100644 --- a/infra/test/TestVNode.cc +++ b/infra/test/TestVNode.cc @@ -242,7 +242,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue) struct rte_mbuf * rt_objects[TEST_MBUFS_COUNT]; int rt_ret = vnode_mirror_rt_object_retrieve(prod, 0, rt_objects, RTE_DIM(rt_objects)); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(rt_ret, 480); #else EXPECT_EQ(rt_ret, 481); @@ -256,7 +256,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue) struct rte_mbuf * rt_objects[TEST_MBUFS_COUNT]; int rt_ret = vnode_mirror_rt_object_retrieve(prod, 1, rt_objects, RTE_DIM(rt_objects)); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(rt_ret, 480); #else EXPECT_EQ(rt_ret, 481); @@ -271,7 +271,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue) struct rte_mbuf * rt_objects[TEST_MBUFS_COUNT]; int rt_ret = vnode_mirror_rt_object_retrieve(prod, 2, rt_objects, RTE_DIM(rt_objects)); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(rt_ret, 480); #else EXPECT_EQ(rt_ret, 481); @@ -286,7 +286,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue) struct rte_mbuf * rt_objects[TEST_MBUFS_COUNT]; int rt_ret = vnode_mirror_rt_object_retrieve(prod, 3, rt_objects, RTE_DIM(rt_objects)); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(rt_ret, 480); #else EXPECT_EQ(rt_ret, 481); @@ -315,7 +315,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue) EXPECT_EQ(prod_on_line_total, 2048); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(prod_deliver_total, 128); EXPECT_EQ(prod_missed_total, 1920); #else @@ -326,7 +326,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue) /* on cons side */ struct vnode_cons_stat * cons_stat = vnode_mirror_cons_stat_get(cons); EXPECT_EQ(cons_stat[0].on_line, 2048); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(cons_stat[0].deliver, 128); EXPECT_EQ(cons_stat[0].missed, 1920); #else @@ -336,7 +336,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue) struct rte_mbuf * deq_objs[TEST_MBUFS_COUNT]; int deq_ret = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs)); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(deq_ret, 128); #else EXPECT_EQ(deq_ret, 124); @@ -379,7 +379,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict) EXPECT_EQ(enq_ret, 0); int rt_ret = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, 512); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(rt_ret, 480); #else EXPECT_EQ(rt_ret, 481); @@ -397,7 +397,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict) /* until here, we have 31 objects enqueue, so only 544 mbufs can be enqueue. */ int deq_ret_1 = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs)); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(deq_ret_1, 32); #else EXPECT_EQ(deq_ret_1, 31); @@ -405,7 +405,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict) struct vnode_prod_stat * prod_stat = vnode_mirror_prod_stat_get(prod); EXPECT_EQ(prod_stat->on_line, 1024); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(prod_stat->deliver, 32); EXPECT_EQ(prod_stat->missed, 992); #else @@ -415,7 +415,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict) struct vnode_cons_stat * cons_stat = vnode_mirror_cons_stat_get(cons); EXPECT_EQ(cons_stat->on_line, 1024); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(cons_stat->deliver, 32); EXPECT_EQ(cons_stat->missed, 992); #else @@ -434,7 +434,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict) /* get the packet needs to be free */ int rt_ret_3 = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, 512); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(rt_ret_3, 480); #else EXPECT_EQ(rt_ret_3, 481); @@ -449,7 +449,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict) rte_pktmbuf_free_bulk(deq_objs, rt_ret_4); int deq_ret_2 = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs)); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(deq_ret_2, 32); #else EXPECT_EQ(deq_ret_2, 31); @@ -459,7 +459,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict) cons_stat = vnode_mirror_cons_stat_get(cons); EXPECT_EQ(prod_stat->on_line, 2048); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(prod_stat->deliver, 64); EXPECT_EQ(prod_stat->missed, 1984); #else @@ -468,7 +468,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict) #endif EXPECT_EQ(cons_stat->on_line, 2048); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(cons_stat->deliver, 64); EXPECT_EQ(cons_stat->missed, 1984); #else @@ -533,7 +533,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleQueueUseSharedCredict) /* retrieve the drop packets */ struct rte_mbuf * rt_objs[128] = {}; int rt_ret = vnode_mirror_rt_object_retrieve(prod, 0, rt_objs, 128); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(rt_ret, 32); #else EXPECT_EQ(rt_ret, 33); @@ -550,7 +550,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleQueueUseSharedCredict) /* retrieve the drop packets */ rt_ret = vnode_mirror_rt_object_retrieve(prod, 1, rt_objs, 128); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(rt_ret, 32); #else EXPECT_EQ(rt_ret, 33); @@ -560,7 +560,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleQueueUseSharedCredict) /* dequeue */ struct rte_mbuf * deq_objs[128] = {}; int deq_ret = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs)); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(deq_ret, 64); #else EXPECT_EQ(deq_ret, 62); @@ -607,7 +607,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueMultipleQueue) EXPECT_EQ(enq_ret, 0); int rt_ret = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, 512); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(rt_ret, 0); #else EXPECT_EQ(rt_ret, 1); @@ -640,7 +640,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueMultipleQueue) rte_pktmbuf_free_bulk(deq_objs, 512); int deq_ret = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs)); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(deq_ret, 512); #else EXPECT_EQ(deq_ret, 511); @@ -1055,7 +1055,7 @@ TEST_F(TestCaseVNodeQueue, MultiQueueEnqueue) EXPECT_EQ(cons_stat[0].deliver, 32); EXPECT_EQ(cons_stat[0].missed, 0); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC // 32/8=4 EXPECT_EQ(cons_stat[0].q_len_max, 4); EXPECT_LE(cons_stat[0].q_len_avg_max, 4); @@ -1076,7 +1076,7 @@ TEST_F(TestCaseVNodeQueue, MultiQueueEnqueue) EXPECT_EQ(cons_stat[0].on_line, 32); EXPECT_EQ(cons_stat[0].deliver, 32); EXPECT_EQ(cons_stat[0].missed, 0); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(cons_stat[0].q_len_max, 4); EXPECT_LE(cons_stat[0].q_len_avg_max, 4); // TODO:check??? #else -- cgit v1.2.3 From d2554a6da257757e5f981059f428b56fa515167e Mon Sep 17 00:00:00 2001 From: liuyu Date: Wed, 3 Jul 2024 02:52:33 -0400 Subject: perf: bbq性能优化 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- infra/include/bbq.h | 127 +++++++-- infra/src/bbq.c | 667 ++++++++++++++++++++++++----------------------- infra/src/vnode_common.c | 15 +- 3 files changed, 455 insertions(+), 354 deletions(-) diff --git a/infra/include/bbq.h b/infra/include/bbq.h index d8afb3d..835a574 100644 --- a/infra/include/bbq.h +++ b/infra/include/bbq.h @@ -1,6 +1,6 @@ /* * @Author: liuyu@geedgenetworks.com - * @LastEditTime: 2024-06-25 16:49:38 + * @LastEditTime: 2024-07-03 02:19:57 * @Describe: bbq(Block-based Bounded Queue)头文件 * 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf */ @@ -15,32 +15,54 @@ // C #include typedef atomic_uint_fast64_t aotmic_uint64; -typedef aotmic_uint64 bbq_cursor; -typedef aotmic_uint64 bbq_head; #else // C++ 为了兼容gtest测试 -using bbq_cursor = std::atomic; -using bbq_head = std::atomic; using aotmic_uint64 = std::atomic; #endif #define BBQ_SOCKET_ID_ANY -1 -#define __BBQ_CACHE_ALIGNED __attribute__((__aligned__(64))) #define BBQ_SYMBOL_MAX 64 +#define BBQ_CACHE_LINE 64 +#define __BBQ_CACHE_ALIGNED __attribute__((__aligned__(BBQ_CACHE_LINE))) + +struct bbq_atomic64 +{ + union { + volatile uint64_t s; // single使用该字段 + aotmic_uint64 m; + }; +}; + +struct bbq_head +{ + struct bbq_atomic64 value; +} __BBQ_CACHE_ALIGNED; struct bbq_block { - bbq_cursor committed; // 已提交(version|offset) - bbq_cursor allocated; // 已分配(version|offset) - bbq_cursor reserved; // 已预留(version|offset) - bbq_cursor consumed; // 已消费(version|offset)注:在drop-old模式下没用到 - char * entries; // 存储大小可变的entry,每个块分配空间:bs * entry_size + struct bbq_atomic64 committed; // 生产者,已提交(version|offset) + struct bbq_atomic64 allocated; // 生产者,已分配(version|offset) + struct bbq_atomic64 reserved; // 消费者,已预留(version|offset) + struct bbq_atomic64 consumed; // 消费者,已消费(version|offset)注:在drop-old模式下没用到 + char * entries __BBQ_CACHE_ALIGNED; // 存储大小可变的entry,每个块分配空间:bs * entry_size +} __BBQ_CACHE_ALIGNED; + +typedef void * (*bbq_malloc_f)(int32_t socket_id, size_t size); +typedef void (*bbq_free_f)(void * ptr, size_t size); + +struct bbq_mempool +{ + char * ptr; // 内存池起始地址 + size_t off; // 已使用的偏移大小 + size_t size; // 内存池总大小 + + bbq_malloc_f malloc_f; // 申请内存的函数,默认为malloc + bbq_free_f free_f; // 申请内存的函数,默认为free } __BBQ_CACHE_ALIGNED; struct bbq { char name[BBQ_SYMBOL_MAX] __BBQ_CACHE_ALIGNED; - int32_t socket_id; // 用于libnuma分配内存,socket_id小于0将使用malloc分配 uint32_t bn; // blocks的个数 uint32_t bs; // blocks.entries的个数 @@ -50,15 +72,33 @@ struct bbq uint64_t idx_mask; // idx_bits偏移后的掩码 uint64_t off_mask; // off_bits偏移后的掩码 uint64_t entry_size; // blocks.entries里每个entry的大小 - bbq_head phead; // 生产者头,指向块的索引,分为两部分:version|idx - bbq_head chead; // 消费者头,指向块的索引,分为两部分:version|idx + bool prod_single; // 如果为单生产者或单消费者,则single为true + bool cons_single; // 如果为单生产者或单消费者,则single为true - struct bbq_block * blocks; // bn大小的数组 + struct bbq_head phead; // 生产者头,指向块的索引,分为两部分:version|idx + struct bbq_head chead; // 消费者头,指向块的索引,分为两部分:version|idx + + struct + { + struct bbq_atomic64 n_enq; + struct bbq_atomic64 n_deq; + } __BBQ_CACHE_ALIGNED stat; + + struct bbq_mempool memory_pool; // 仅在初始化和调试时会读写 + struct bbq_block * blocks; // bn大小的数组 } __BBQ_CACHE_ALIGNED; #define BBQ_F_DEFAULT 0x0 + #define BBQ_F_DROP_OLD 0x0002 /**< 创建队列时设置为drop old模式(队列满时,入队成功并覆盖旧数据) */ #define BBQ_F_RETRY_NEW BBQ_F_DEFAULT /**< 创建队列时设置为retry new模式(队列满时,当前入队失败) */ + +#define BBQ_F_SP_ENQ 0x0004 +#define BBQ_F_MP_ENQ BBQ_F_DEFAULT +#define BBQ_F_SC_DEQ 0x0008 +#define BBQ_F_MC_DEQ BBQ_F_DEFAULT +#define BBQ_F_ENABLE_STAT 0x0010 +#define BBQ_F_DISABLE_STAT BBQ_F_DEFAULT /** * 创建bbq队列,使用当前函数创建的队列,后续操作会把指针入队。 * 对应入队函数:bbq_enqueue、bbq_enqueue_burst @@ -73,8 +113,18 @@ struct bbq * 当检测到不支持多numa,将转为malloc分配内存。 * @param[in] flags * 设置入队策略: - * 1)BBQ_F_RETRY_NEW(默认):队列满了当前入队失败。 - * 2)BBQ_F_DROP_OLD:队列满时,覆盖旧数据,入队成功 + * - BBQ_F_DROP_OLD:队列满时,覆盖旧数据,入队成功 + * - BBQ_F_RETRY_NEW:队列满了当前入队失败(默认)。 + * 设置生产者模式: + * - BBQ_F_SP_ENQ:单生产者 + * - BBQ_F_MP_ENQ:多生产者(默认) + * 设置消费者模式: + * - BBQ_F_SC_DEQ:单消费者 + * - BBQ_F_MC_DEQ:多消费者(默认) + * 设置统计功能: + * 注:目前仅retry new模式下支持统计功能 + * - BBQ_F_ENABLE_STAT:开启统计功能 + * - BBQ_F_DISABLE_STAT:关闭统计功能(默认) * @return * 非NULL:消息队列结构体指针,用于后续出队入队等操作。 * NULL:创建失败,可通过bbq_errno分析具体错误原因: @@ -82,8 +132,10 @@ struct bbq * - BBQ_ERR_ALLOC:申请内存失败 * - BBQ_ERR_POWER_OF_TWO:count不为2的n次方 * - BBQ_ERR_INPUT_NULL:name传入空指针 + * - BBQ_ERR_STAT_NOT_SUPPORT:drop old模式下不支持 */ -extern struct bbq * bbq_create(const char * name, uint32_t count, int socket_id, uint32_t flags); +extern struct bbq * bbq_create(const char * name, uint32_t count, int socket_id, uint32_t flags, bbq_malloc_f malloc_f, + bbq_free_f free_f); /** * 消息队列单个指针入队 @@ -180,8 +232,18 @@ extern uint32_t bbq_dequeue_burst(struct bbq * q, void ** obj_table, uint32_t n, * 当检测到不支持多numa,将转为malloc分配内存。 * @param[in] flags * 设置入队策略: - * 1)BBQ_F_RETRY_NEW(默认):队列满了当前入队失败。 - * 2)BBQ_F_DROP_OLD:队列满时,覆盖旧数据,入队成功 + * - BBQ_F_DROP_OLD:队列满时,覆盖旧数据,入队成功 + * - BBQ_F_RETRY_NEW:队列满了当前入队失败(默认)。 + * 设置生产者模式: + * - BBQ_F_SP_ENQ:单生产者 + * - BBQ_F_MP_ENQ:多生产者(默认) + * 设置消费者模式: + * - BBQ_F_SC_DEQ:单消费者 + * - BBQ_F_MC_DEQ:多消费者(默认) + * 设置统计功能: + * 注:目前仅retry new模式下支持统计功能 + * - BBQ_F_ENABLE_STAT:开启统计功能 + * - BBQ_F_DISABLE_STAT:关闭统计功能(默认) * @return * 非NULL:消息队列结构体指针,用于后续出队入队等操作。 * NULL:创建失败。可通过bbq_errno分析具体错误原因: @@ -189,8 +251,10 @@ extern uint32_t bbq_dequeue_burst(struct bbq * q, void ** obj_table, uint32_t n, * - BBQ_ERR_ALLOC:申请内存失败 * - BBQ_ERR_POWER_OF_TWO:count不为2的n次方 * - BBQ_ERR_INPUT_NULL:name传入空指针 + * - BBQ_ERR_STAT_NOT_SUPPORT:drop old模式下不支持 */ -extern struct bbq * bbq_create_elem(const char * name, uint32_t count, size_t obj_size, int socket_id, uint32_t flags); +extern struct bbq * bbq_create_elem(const char * name, uint32_t count, size_t obj_size, int socket_id, uint32_t flags, + bbq_malloc_f malloc_f, bbq_free_f free_f); /** * 消息队列单个数据入队(指针指向的数据将被拷贝) @@ -293,13 +357,22 @@ extern void bbq_destory(struct bbq * q); */ extern void bbq_debug_struct_print(struct bbq * q); +/** + * 打印指定块信息(调试用)。 + * + * @param[in] q + * 队列指针 + */ +extern void bbq_debug_block_print(struct bbq * q, struct bbq_block * block); + // 错误码 -#define BBQ_OK 0 // 成功 -#define BBQ_ERR -1 // 通用错误 -#define BBQ_ERR_ALLOC -2 // 内存分配失败 -#define BBQ_ERR_INPUT_NULL -3 // 传入空指针 -#define BBQ_ERR_POWER_OF_TWO -4 // 不是2的n次方 -#define BBQ_ERR_OUT_OF_RANGE -5 // 超出范围 +#define BBQ_OK 0 // 成功 +#define BBQ_ERR -1 // 通用错误,无法分类时使用 +#define BBQ_ERR_ALLOC -2 // 内存分配失败 +#define BBQ_ERR_INPUT_NULL -3 // 传入空指针 +#define BBQ_ERR_POWER_OF_TWO -4 // 不是2的n次方 +#define BBQ_ERR_OUT_OF_RANGE -5 // 超出范围 +#define BBQ_ERR_STAT_NOT_SUPPORT -6 // 不支持统计 #define BBQ_ERR_FULL -101 // 队列已满(入队失败) #define BBQ_ERR_BUSY -102 // 队列忙碌中(入队或出队失败) diff --git a/infra/src/bbq.c b/infra/src/bbq.c index 6d1a954..0ab391f 100644 --- a/infra/src/bbq.c +++ b/infra/src/bbq.c @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-25 16:49:28 + * @LastEditTime: 2024-07-03 02:20:07 * @Email: liuyu@geedgenetworks.com * @Describe: bbq(Block-based Bounded Queue)实现 * 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf @@ -17,7 +17,10 @@ // 判断flags标记位 #define BBQ_F_CHK_DROP_OLD(flags) (flags & BBQ_F_DROP_OLD) -#define BBQ_F_CHK_VALUE(flags) (flags & BBQ_F_COPY_VALUE) +#define BBQ_F_CHK_COPY_VALUE(flags) (flags & BBQ_F_COPY_VALUE) +#define BBQ_F_CHK_SP_ENQ(flags) (flags & BBQ_F_SP_ENQ) +#define BBQ_F_CHK_SC_DEQ(flags) (flags & BBQ_F_SC_DEQ) +#define BBQ_F_CHK_STAT_ENABLE(flags) (flags & BBQ_F_ENABLE_STAT) // 避免无用参数的编译告警 #define AVOID_WARNING(param) ((void)param) @@ -88,80 +91,95 @@ static inline uint64_t bbq_set_cur_vsn(struct bbq * q, uint64_t ver) return ver << q->off_bits; } -// 当BBQ_MEMORY宏定义开关打开,将对内存分配释放进行统计,方便排查内存泄漏 -enum bbq_module -{ - BBQ_MODULE_MAIN = 0, - BBQ_MODULE_BLOCK_NB, - BBQ_MODULE_BLOCK_ENTRY, - BBQ_MODULE_MAX, -}; - -#ifdef BBQ_MEMORY -#define BBQ_MEM_MAGIC 0xFF -struct bbq_memory_s +uint64_t bbq_atomic64_load(struct bbq_atomic64 * atomic, bool single) { - aotmic_uint64 malloc_cnt; - aotmic_uint64 malloc_size; - aotmic_uint64 free_cnt; - aotmic_uint64 free_size; -}; -struct bbq_memory_s bbq_memory_g[BBQ_MODULE_MAX] = {0}; -#endif - -static void * bbq_malloc(enum bbq_module module, int socket_id, size_t size) -{ - void * ptr = NULL; - if (socket_id >= 0) + if (single) { - ptr = numa_alloc_onnode(size, 0); + return atomic->s; } else { - ptr = malloc(size); + return atomic_load(&atomic->m); } -#ifdef BBQ_MEMORY - if (ptr != NULL) +} + +void bbq_atomic64_store(struct bbq_atomic64 * atomic, uint64_t value, bool single) +{ + if (single) { - atomic_fetch_add(&bbq_memory_g[module].malloc_cnt, 1); - atomic_fetch_add(&bbq_memory_g[module].malloc_size, size); + atomic->s = value; + } + else + { + atomic_store(&atomic->m, value); } -#else - AVOID_WARNING(module); -#endif - return ptr; } - -static void bbq_free(enum bbq_module module, int socket_id, void * ptr, size_t size) +static inline uint64_t bbq_atomic64_fetch_add(struct bbq_atomic64 * atomic, uint64_t value, bool single) { - if (socket_id >= 0) + if (single) { - numa_free(ptr, size); + uint64_t old = atomic->s; + atomic->s += value; + return old; } else { - free(ptr); + return atomic_fetch_add(&atomic->m, value); } +} + +// 当BBQ_MEMORY宏定义开关打开,将对内存分配释放进行统计,方便排查内存泄漏 #ifdef BBQ_MEMORY - if (ptr != NULL) +#define BBQ_MEM_MAGIC 0xFF +struct bbq_memory_s +{ + uint64_t malloc_cnt; + uint64_t malloc_size; + uint64_t free_cnt; + uint64_t free_size; +}; +struct bbq_memory_s bbq_memory_g = {0}; +#endif + +void * bbq_malloc_def_callback(int32_t socket_id __attribute__((unused)), size_t size) +{ +#ifdef BBQ_MEMORY + bbq_memory_g.malloc_cnt++; + bbq_memory_g.malloc_size += size; +#endif + return aligned_alloc(BBQ_CACHE_LINE, size); +} + +void bbq_free_def_callback(void * ptr, size_t size __attribute__((unused))) +{ +#ifdef BBQ_MEMORY + if (ptr) { - atomic_fetch_add(&bbq_memory_g[module].free_cnt, 1); - atomic_fetch_add(&bbq_memory_g[module].free_size, size); + bbq_memory_g.free_cnt++; + bbq_memory_g.free_size += size; } -#else - AVOID_WARNING(module); #endif + free(ptr); } /* 原子的比较两个值大小,并设置较大的值,成功则返回设置前的旧值 */ -uint64_t bbq_fetch_max(aotmic_uint64 * atom, uint64_t upd) +uint64_t bbq_fetch_max(struct bbq_atomic64 * atomic, uint64_t upd, bool single) { uint64_t old_value = 0; - do + + if (single) { - old_value = atomic_load(atom); - } while (old_value < upd && !atomic_compare_exchange_weak(atom, &old_value, upd)); + old_value = atomic->s; + atomic->s = upd; + } + else + { + do + { + old_value = atomic_load(&atomic->m); + } while (old_value < upd && !atomic_compare_exchange_weak(&atomic->m, &old_value, upd)); + } return old_value; } @@ -179,7 +197,7 @@ bool bbq_check_power_of_two(uint32_t n) /* 根据entries大小返回合理的block个数 * 计算公式:log2(blocks) = max(1, ⌊log2(count)/4⌋) 注:⌊ ⌋代表向下取整。*/ -uint32_t bbq_block_number_calc(uint32_t entries) +static uint32_t bbq_block_number_calc(uint32_t entries) { double log_entries = log2((double)entries); uint32_t over4 = (uint32_t)(log_entries / 4); // 向下取整 @@ -188,16 +206,61 @@ uint32_t bbq_block_number_calc(uint32_t entries) return n; } +int bbq_bnbs_calc(uint32_t entries, uint32_t * bn, uint32_t * bs) +{ + if (bn == NULL || bs == NULL) + { + bbq_errno = BBQ_ERR_INPUT_NULL; + return bbq_errno; + } + + if (entries <= 1) + { + bbq_errno = BBQ_ERR_OUT_OF_RANGE; + return bbq_errno; + } + + if (bbq_check_power_of_two(entries) == false) + { + bbq_errno = BBQ_ERR_POWER_OF_TWO; + return bbq_errno; + } + + *bn = bbq_block_number_calc(entries); + *bs = entries / *bn; + + return BBQ_OK; +} + +void * bbq_malloc_from_pool(struct bbq * q, size_t size) +{ + if (q->memory_pool.size - q->memory_pool.off < size) + { + return NULL; + } + + char * mem = q->memory_pool.ptr + q->memory_pool.off; + q->memory_pool.off += size; + + return mem; +} + /* 块初始化 */ int block_init(struct bbq * q, struct bbq_block * block, bool cursor_init) { + size_t size = 0; + #ifdef BBQ_MEMORY - // 末尾多分配一个entry(永远不应该被修改),以此检查是否存在写越界的问题 - block->entries = bbq_malloc(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, (q->bs + 1) * q->entry_size); + // 末尾多分配一个entry,它永远不应该被修改,以此检查是否存在写越界的问题 + size = (q->bs + 1) * q->entry_size; + block->entries = bbq_malloc_from_pool(q, size); char * last_entry = block->entries + q->entry_size * q->bs; + memset(block->entries, 0, size); memset(last_entry, BBQ_MEM_MAGIC, q->entry_size); #else - block->entries = bbq_malloc(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, q->bs * q->entry_size); + size = q->bs * q->entry_size; + block->entries = bbq_malloc_from_pool(q, size); + memset(block->entries, 0, size); #endif if (block->entries == NULL) @@ -206,24 +269,15 @@ int block_init(struct bbq * q, struct bbq_block * block, bool cursor_init) return bbq_errno; } - block->committed = ATOMIC_VAR_INIT(0); - block->allocated = ATOMIC_VAR_INIT(0); - block->reserved = ATOMIC_VAR_INIT(0); - block->consumed = ATOMIC_VAR_INIT(0); - if (cursor_init) { // block数组里,除了第一块之外需要设置 - block->committed = ATOMIC_VAR_INIT(q->bs); - block->allocated = ATOMIC_VAR_INIT(q->bs); - block->reserved = ATOMIC_VAR_INIT(q->bs); - if (BBQ_F_CHK_DROP_OLD(q->flags)) + bbq_atomic64_store(&block->committed, q->bs, q->prod_single); + bbq_atomic64_store(&block->allocated, q->bs, q->prod_single); + bbq_atomic64_store(&block->reserved, q->bs, q->cons_single); + if (!BBQ_F_CHK_DROP_OLD(q->flags)) { - block->consumed = ATOMIC_VAR_INIT(0); // drop old模式下用不到consumed - } - else - { - block->consumed = ATOMIC_VAR_INIT(q->bs); + bbq_atomic64_store(&block->consumed, q->bs, q->cons_single); } } @@ -236,9 +290,9 @@ void block_destory(struct bbq * q, struct bbq_block * block) if (block->entries) { #ifdef BBQ_MEMORY - bbq_free(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, block->entries, (q->bs + 1) * q->entry_size); + q->memory_pool.free_f(block->entries, (q->bs + 1) * q->entry_size); #else - bbq_free(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, block->entries, q->bs * q->entry_size); + q->memory_pool.free_f(block->entries, q->bs * q->entry_size); #endif block->entries = NULL; } @@ -262,14 +316,12 @@ static unsigned bbq_ceil_log2(uint64_t x) return x == 1 ? 0 : bbq_floor_log2(x - 1) + 1; } -// 0----------------------------------------------------------------------------- /* 创建消息队列,bn和bs必须是2的N次幂,socket_id用于多numa分配内存 */ static struct bbq * __bbq_create_bnbs(const char * name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, - uint32_t flags) + uint32_t flags, bbq_malloc_f malloc_f, bbq_free_f free_f) { int ret = 0; - bbq_errno = BBQ_OK; - + size_t size = 0; if (bbq_check_power_of_two(bn) == false) { bbq_errno = BBQ_ERR_POWER_OF_TWO; @@ -296,39 +348,73 @@ static struct bbq * __bbq_create_bnbs(const char * name, uint32_t bn, uint32_t b if (numa_available() < 0) { - // 不支持numa,设置 + // 不支持numa socket_id = BBQ_SOCKET_ID_ANY; } - struct bbq * q = bbq_malloc(BBQ_MODULE_MAIN, socket_id, sizeof(*q)); + if (BBQ_F_CHK_DROP_OLD(flags) && BBQ_F_CHK_STAT_ENABLE(flags)) + { + bbq_errno = BBQ_ERR_STAT_NOT_SUPPORT; + return NULL; + } + + if (malloc_f == NULL) + { + malloc_f = bbq_malloc_def_callback; + } + + if (free_f == NULL) + { + free_f = bbq_free_def_callback; + } + + uint32_t all_size = 0; +#ifdef BBQ_MEMORY + all_size = sizeof(struct bbq) + bn * sizeof(struct bbq_block) + bn * (bs + 1) * obj_size; +#else + all_size = sizeof(struct bbq) + bn * sizeof(struct bbq_block) + bn * bs * obj_size; +#endif + struct bbq * q = malloc_f(socket_id, all_size); if (q == NULL) { bbq_errno = BBQ_ERR_ALLOC; return NULL; } - memset(q, 0, sizeof(*q)); + + memset(q, 0, all_size); + q->memory_pool.size = all_size; + q->memory_pool.ptr = (char *)q; + q->memory_pool.off += sizeof(struct bbq); + q->memory_pool.malloc_f = malloc_f; + q->memory_pool.free_f = free_f; + ret = snprintf(q->name, sizeof(q->name), "%s", name); q->bn = bn; q->bs = bs; q->entry_size = obj_size; q->socket_id = socket_id; - q->phead = ATOMIC_VAR_INIT(0); - q->chead = ATOMIC_VAR_INIT(0); + if (BBQ_F_CHK_SP_ENQ(flags)) + { + q->prod_single = true; + } + if (BBQ_F_CHK_SC_DEQ(flags)) + { + q->cons_single = true; + } q->flags = flags; - q->blocks = bbq_malloc(BBQ_MODULE_BLOCK_NB, socket_id, bn * sizeof(*q->blocks)); + size = bn * sizeof(*q->blocks); + q->blocks = bbq_malloc_from_pool(q, size); if (q->blocks == NULL) { bbq_errno = BBQ_ERR_ALLOC; goto error; } - memset(q->blocks, 0, sizeof(*q->blocks)); + memset(q->blocks, 0, size); for (uint32_t i = 0; i < bn; ++i) { - // 第一个block不需要设置cursor_init - bool cursor_init = (i == 0 ? false : true); - ret = block_init(q, &(q->blocks[i]), cursor_init); + ret = block_init(q, &(q->blocks[i]), (i == 0 ? false : true)); if (ret != BBQ_OK) { goto error; @@ -336,7 +422,13 @@ static struct bbq * __bbq_create_bnbs(const char * name, uint32_t bn, uint32_t b } q->idx_bits = bbq_ceil_log2(bn); - q->off_bits = bbq_ceil_log2(bs) + 1; // 多线程同时add,可能超过bs的问题,因此多分配一位 + uint32_t off_bits = bbq_ceil_log2(bs); + if (off_bits < 13) + { + off_bits = 13; + } + q->off_bits = + off_bits; // 多线程同时FAA,可能会超过最大索引,因此多分配一些空间,防止FAA溢出 TODO:多少合适?ver溢出问题? q->idx_mask = (1 << q->idx_bits) - 1; q->off_mask = (1 << q->off_bits) - 1; @@ -348,59 +440,52 @@ error: return NULL; } -struct bbq * bbq_create_bnbs(const char * name, uint32_t bn, uint32_t bs, int socket_id, uint32_t flags) +/* 使用自定义的bn、bs创建指针入队的bbq,一般用于单元测试 */ +struct bbq * bbq_create_with_bnbs(const char * name, uint32_t bn, uint32_t bs, int socket_id, uint32_t flags, + bbq_malloc_f malloc_f, bbq_free_f free_f) { - return __bbq_create_bnbs(name, bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR); + bbq_errno = BBQ_OK; + return __bbq_create_bnbs(name, bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR, malloc_f, free_f); } -struct bbq * bbq_create_bnbs_elem(const char * name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, - uint32_t flags) +/* 使用自定义的bn、bs创建值入队的bbq,一般用于单元测试 */ +struct bbq * bbq_create_elem_with_bnbs(const char * name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, + uint32_t flags, bbq_malloc_f malloc_f, bbq_free_f free_f) { - return __bbq_create_bnbs(name, bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE); + bbq_errno = BBQ_OK; + return __bbq_create_bnbs(name, bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE, malloc_f, free_f); } /* 创建消息队列,count必须大于1,且是2的N次幂,bn和bs将根据count值自动计算,socket_id用于多numa分配内存,free_func先设置NULL */ -struct bbq * bbq_create_elem(const char * name, uint32_t count, size_t obj_size, int socket_id, uint32_t flags) +struct bbq * bbq_create_elem(const char * name, uint32_t count, size_t obj_size, int socket_id, uint32_t flags, + bbq_malloc_f malloc_f, bbq_free_f free_f) { bbq_errno = BBQ_OK; - if (count <= 1) - { - bbq_errno = BBQ_ERR_OUT_OF_RANGE; - return NULL; - } + uint32_t bn = 0; + uint32_t bs = 0; - if (bbq_check_power_of_two(count) == false) + if (bbq_bnbs_calc(count, &bn, &bs) != BBQ_OK) { - bbq_errno = BBQ_ERR_POWER_OF_TWO; return NULL; } - uint32_t bn = bbq_block_number_calc(count); - uint32_t bs = count / bn; - - return bbq_create_bnbs_elem(name, bn, bs, obj_size, socket_id, flags); + return __bbq_create_bnbs(name, bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE, malloc_f, free_f); } -struct bbq * bbq_create(const char * name, uint32_t count, int socket_id, uint32_t flags) +struct bbq * bbq_create(const char * name, uint32_t count, int socket_id, uint32_t flags, bbq_malloc_f malloc_f, + bbq_free_f free_f) { bbq_errno = BBQ_OK; - if (count <= 1) - { - bbq_errno = BBQ_ERR_OUT_OF_RANGE; - return NULL; - } + uint32_t bn = 0; + uint32_t bs = 0; - if (bbq_check_power_of_two(count) == false) + if (bbq_bnbs_calc(count, &bn, &bs) != BBQ_OK) { - bbq_errno = BBQ_ERR_POWER_OF_TWO; return NULL; } - uint32_t bn = bbq_block_number_calc(count); - uint32_t bs = count / bn; - - return bbq_create_bnbs(name, bn, bs, socket_id, flags); + return __bbq_create_bnbs(name, bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR, malloc_f, free_f); } /* 释放消息队列,与bbq_ring_create系列接口成对*/ @@ -411,23 +496,17 @@ void bbq_destory(struct bbq * q) return; } - for (uint32_t i = 0; i < q->bn; ++i) - { - block_destory(q, &(q->blocks[i])); - } - - bbq_free(BBQ_MODULE_BLOCK_NB, q->socket_id, q->blocks, q->bn * sizeof(*q->blocks)); - bbq_free(BBQ_MODULE_MAIN, q->socket_id, q, sizeof(*q)); + q->memory_pool.free_f(q->memory_pool.ptr, q->memory_pool.size); } #define BBQ_DATA_TYPE_SINGLE 0x0 #define BBQ_DATA_TYPE_ARRAY_1D 0x1 #define BBQ_DATA_TYPE_ARRAY_2D 0x2 -void commit_entry(struct bbq * q, struct bbq_entry_desc * e, void const * data, uint32_t data_type) +void bbq_commit_entry(struct bbq * q, struct bbq_entry_desc * e, void const * data, uint32_t data_type) { size_t idx = e->off * q->entry_size; - if (BBQ_F_CHK_VALUE(q->flags)) + if (BBQ_F_CHK_COPY_VALUE(q->flags)) { // 数据入队列 switch (data_type) @@ -448,6 +527,7 @@ void commit_entry(struct bbq * q, struct bbq_entry_desc * e, void const * data, break; } default: + bbq_errno = BBQ_ERR; break; } } @@ -462,30 +542,32 @@ void commit_entry(struct bbq * q, struct bbq_entry_desc * e, void const * data, memcpy(&(e->block->entries[idx]), data, q->entry_size * e->actual_burst); break; case BBQ_DATA_TYPE_ARRAY_1D: - break; default: + bbq_errno = BBQ_ERR; break; } } - atomic_fetch_add(&e->block->committed, e->actual_burst); + bbq_atomic64_fetch_add(&e->block->committed, e->actual_burst, q->prod_single); } -struct bbq_queue_state_s allocate_entry(struct bbq * q, struct bbq_block * block, uint32_t n) +struct bbq_queue_state_s bbq_allocate_entry(struct bbq * q, uint64_t ph, uint32_t n) { struct bbq_queue_state_s state = {0}; - if (bbq_cur_off(q, atomic_load(&block->allocated)) >= q->bs) + uint64_t ph_idx = bbq_head_idx(q, ph); + bool prod_single = q->prod_single; + + struct bbq_block * block = &(q->blocks[ph_idx]); + if (bbq_cur_off(q, bbq_atomic64_load(&block->allocated, prod_single)) >= q->bs) { state.state = BBQ_BLOCK_DONE; return state; } - uint64_t old = atomic_fetch_add(&block->allocated, n); - uint64_t committed_vsn = bbq_cur_vsn(q, atomic_load(&block->committed)); - - // committed_vsn在当前块被初始化后值是不变的,通过比较vsn值,来判断allocated的off是否溢出了,导致vsn+1 + uint64_t old = bbq_atomic64_fetch_add(&block->allocated, n, prod_single); uint64_t cur_vsn = bbq_cur_vsn(q, old); uint64_t cur_off = bbq_cur_off(q, old); - if ((cur_vsn != committed_vsn) || (cur_off >= q->bs)) + + if (cur_off >= q->bs) { state.state = BBQ_BLOCK_DONE; return state; @@ -511,15 +593,17 @@ struct bbq_queue_state_s allocate_entry(struct bbq * q, struct bbq_block * block enum bbq_queue_state advance_phead(struct bbq * q, uint64_t ph) { - // 获取下一个block uint64_t cur = 0; + // 获取下一个block struct bbq_block * n_blk = &(q->blocks[(bbq_head_idx(q, ph) + 1) & q->idx_mask]); uint64_t ph_vsn = bbq_head_vsn(q, ph); + bool prod_single = q->prod_single; + bool cons_single = q->cons_single; if (BBQ_F_CHK_DROP_OLD(q->flags)) { - cur = atomic_load(&n_blk->committed); - // 生产者避免前进到上一轮中尚未完全提交的区块 + cur = bbq_atomic64_load(&n_blk->committed, prod_single); + // 生产者head避免覆盖上一轮尚未完全提交的区块 if (bbq_cur_vsn(q, cur) == ph_vsn && bbq_cur_off(q, cur) != q->bs) { return BBQ_NOT_AVAILABLE; @@ -527,15 +611,15 @@ enum bbq_queue_state advance_phead(struct bbq * q, uint64_t ph) } else { - cur = atomic_load(&n_blk->consumed); + cur = bbq_atomic64_load(&n_blk->consumed, cons_single); uint64_t reserved; uint64_t consumed_off = bbq_cur_off(q, cur); uint64_t consumed_vsn = bbq_cur_vsn(q, cur); - if (consumed_vsn < ph_vsn || // 生产者赶上了消费者 - (consumed_vsn == ph_vsn && consumed_off != q->bs)) + if (consumed_vsn < ph_vsn || (consumed_vsn == ph_vsn && consumed_off != q->bs)) { - reserved = atomic_load(&n_blk->reserved); + // 生产者赶上了消费者 + reserved = bbq_atomic64_load(&n_blk->reserved, cons_single); if (bbq_cur_off(q, reserved) == consumed_off) { return BBQ_NO_ENTRY; @@ -547,76 +631,40 @@ enum bbq_queue_state advance_phead(struct bbq * q, uint64_t ph) } } - // 用head的version初始化下一个块,version在高位,version+1,idex/offset清零,如果没有被其他线程执行过,数值会高于旧值。多线程同时只更新一次。 + // 用head的version值初始化下一个块,version在高位,version+1,index或offset也会被清零 uint64_t new_vsn = bbq_set_cur_vsn(q, ph_vsn + 1); - bbq_fetch_max(&n_blk->committed, new_vsn); - bbq_fetch_max(&n_blk->allocated, new_vsn); - - // 索引+1,当超过索引范围,也就是循环下一轮块时,version+1 - bbq_fetch_max(&q->phead, ph + 1); + // 其他线程完成了head更新,当前bbq_fetch_max不会再更新,可能在以下两种情况: + // 1)实际ph_vsn与本次要更新的ph_vsn相同。 + // 2)当前ph_vsn已经落后于实际的ph_vsn(且移动到了下一轮), + bbq_fetch_max(&n_blk->committed, new_vsn, prod_single); + bbq_fetch_max(&n_blk->allocated, new_vsn, prod_single); + + // ph+1,当超过索引范围,进入下一轮时,version会自动+1 + bbq_fetch_max(&q->phead.value, ph + 1, prod_single); return BBQ_SUCCESS; } -static uint32_t bbq_wait_consumed_set(struct bbq * q, uint64_t * ch_ptr, uint64_t * ph_ptr, struct bbq_block * blk_ph) +bool bbq_empty(struct bbq * q) { - uint64_t ch; - uint64_t ph; - if (ch_ptr != NULL) - { - ch = *ch_ptr; - } - else - { - ch = atomic_load(&q->chead); - } - - if (ph_ptr != NULL) - { - ph = *ph_ptr; - } - else - { - ph = atomic_load(&q->phead); - } - - uint64_t ph_idx = bbq_head_idx(q, ph); - uint64_t ch_idx = bbq_head_idx(q, ch); - uint64_t committed_off = bbq_cur_off(q, atomic_load(&blk_ph->committed)); - - struct bbq_block * blk_ch = &(q->blocks[bbq_head_idx(q, ch)]); - uint64_t reserved_off = bbq_cur_off(q, atomic_load(&blk_ch->reserved)); - - // "生产者"超过"消费者"的索引(即块)的个数 - uint64_t idx_diff = ph_idx >= ch_idx ? ph_idx - ch_idx : q->bn - ch_idx + ph_idx; - if (!BBQ_F_CHK_DROP_OLD(q->flags)) - { - // 这里idx_diff-1=-1也是正确。 - return (idx_diff - 1) * q->bs + (q->bs - reserved_off + committed_off); - } - - uint64_t ch_vsn = bbq_head_vsn(q, ch); - uint64_t ph_vsn = bbq_head_vsn(q, ph); - - if (ph_vsn == ch_vsn || (ph_vsn == (ch_vsn + 1) && (ph_idx < ch_idx))) - { - return (idx_diff - 1) * q->bs + (q->bs - reserved_off + committed_off); - } + return bbq_atomic64_load(&q->stat.n_enq, q->prod_single) == bbq_atomic64_load(&q->stat.n_deq, q->cons_single); +} - // 发生了覆盖 - if (ph_idx == ch_idx) - { - // 当前块以及之前已生产的都作废 - return 0; - } +static uint32_t bbq_wait_consumed_get(struct bbq * q, uint64_t enq_update, uint64_t deq_update) +{ + uint64_t enq_now = enq_update == 0 ? bbq_atomic64_load(&q->stat.n_enq, q->prod_single) : enq_update; + uint64_t deq_now = deq_update == 0 ? bbq_atomic64_load(&q->stat.n_deq, q->cons_single) : deq_update; - return (idx_diff - 1) * q->bs + committed_off; + return enq_now - deq_now; } +//----------------------------------------------------------------- /* 消息队列入队 */ -static struct bbq_status __bbq_enqueue(struct bbq * q, void const * data, uint32_t n, uint32_t flag, +static struct bbq_status __bbq_enqueue(struct bbq * q, void const * data, uint32_t n, uint32_t data_type, uint32_t * wait_consumed) { + uint64_t enq_update = 0; struct bbq_status ret = {.status = 0, .actual_burst = 0}; + bool prod_single = q->prod_single; if (q == NULL || data == NULL) { @@ -627,17 +675,21 @@ static struct bbq_status __bbq_enqueue(struct bbq * q, void const * data, uint32 while (true) { - // 获取当前phead,转为索引后获取到当前的blk - uint64_t ph = atomic_load(&q->phead); - struct bbq_block * blk = &(q->blocks[bbq_head_idx(q, ph)]); - struct bbq_queue_state_s state = allocate_entry(q, blk, n); + uint64_t ph = bbq_atomic64_load(&q->phead.value, prod_single); + struct bbq_queue_state_s state = bbq_allocate_entry(q, ph, n); switch (state.state) { case BBQ_ALLOCATED: - commit_entry(q, &state.e, data, flag); + bbq_commit_entry(q, &state.e, data, data_type); ret.actual_burst = state.e.actual_burst; ret.status = BBQ_OK; + + if (BBQ_F_CHK_STAT_ENABLE(q->flags)) + { + enq_update = + bbq_atomic64_fetch_add(&q->stat.n_enq, state.e.actual_burst, prod_single) + state.e.actual_burst; + } break; case BBQ_BLOCK_DONE: { enum bbq_queue_state pstate = advance_phead(q, ph); @@ -670,9 +722,9 @@ static struct bbq_status __bbq_enqueue(struct bbq * q, void const * data, uint32 break; } - if (wait_consumed != NULL) + if (BBQ_F_CHK_STAT_ENABLE(q->flags) && wait_consumed != NULL) { - *wait_consumed = bbq_wait_consumed_set(q, NULL, &ph, blk); + *wait_consumed = bbq_wait_consumed_get(q, enq_update, 0); } return ret; @@ -694,49 +746,51 @@ int bbq_enqueue_elem(struct bbq * q, void const * data) } /* 更新成功 reserve成功的个数 */ -uint32_t bbq_reserve_update(bbq_cursor * aotmic, uint64_t reserved, uint32_t n) +uint32_t bbq_reserve_update(struct bbq_atomic64 * atomic, uint64_t reserved, uint32_t n, bool single) { - // TODO:逻辑可以合并 - if (n == 1) + if (single) { - // fetch_max返回的是旧值,如果旧值等于局部变量reserved,则代表数据由当前线程更新 - if (bbq_fetch_max(aotmic, reserved + 1) == reserved) - { - return 1; - } - - return 0; + atomic->s += n; + return n; } else { - bool ret = atomic_compare_exchange_weak(aotmic, &reserved, reserved + n); - return ret == true ? n : 0; + // TODO:合并逻辑? + if (n == 1) + { + // fetch_max返回的是旧值,如果旧值等于局部变量reserved,则代表数据由当前线程更新 + if (bbq_fetch_max(atomic, reserved + 1, single) == reserved) + { + return 1; + } + + return 0; + } + else + { + // 不能用fetch_max??? + bool ret = atomic_compare_exchange_weak(&atomic->m, &reserved, reserved + n); + return ret == true ? n : 0; + } } } struct bbq_queue_state_s bbq_reserve_entry(struct bbq * q, struct bbq_block * block, uint32_t n) { + uint32_t cont = 0; + bool prod_single = q->prod_single; + bool cons_single = q->cons_single; + while (true) { struct bbq_queue_state_s state; - uint64_t reserved = atomic_load(&block->reserved); + uint64_t reserved = bbq_atomic64_load(&block->reserved, cons_single); uint64_t reserved_off = bbq_cur_off(q, reserved); uint64_t reserved_svn = bbq_cur_vsn(q, reserved); if (reserved_off < q->bs) { - uint64_t consumed = atomic_load(&block->consumed); - // TODO:bug?? ver溢出了,在drop old模式下使用了vsn,应该传入consumed的vsn合理? - // TODO:这个情况可能出现? - if (!BBQ_F_CHK_DROP_OLD(q->flags) && reserved_svn != bbq_cur_vsn(q, consumed)) - { - // consumed溢出了,这种情况只发生在BBQ_RETRY_NEW,因为BBQ_DROP_OLD模式,consumed没有用到 - state.state = BBQ_BLOCK_DONE; - state.vsn = reserved_svn; - return state; - } - - uint64_t committed = atomic_load(&block->committed); + uint64_t committed = bbq_atomic64_load(&block->committed, prod_single); uint64_t committed_off = bbq_cur_off(q, committed); if (committed_off == reserved_off) { @@ -747,7 +801,7 @@ struct bbq_queue_state_s bbq_reserve_entry(struct bbq * q, struct bbq_block * bl // 当前块的数据没有被全部commited,需要通过判断allocated和committed来判断是否存在正在入队进行中的数据 if (committed_off != q->bs) { - uint64_t allocated = atomic_load(&block->allocated); + uint64_t allocated = bbq_atomic64_load(&block->allocated, prod_single); if (bbq_cur_off(q, allocated) != committed_off) { state.state = BBQ_NOT_AVAILABLE; @@ -756,9 +810,9 @@ struct bbq_queue_state_s bbq_reserve_entry(struct bbq * q, struct bbq_block * bl } uint32_t tmp = committed_off - reserved_off; - uint32_t reserved_cnt = bbq_reserve_update(&block->reserved, reserved, tmp < n ? tmp : n); + uint32_t reserved_cnt = bbq_reserve_update(&block->reserved, reserved, tmp < n ? tmp : n, q->cons_single); if (reserved_cnt > 0) - { // TODO:多entry时关注 + { state.state = BBQ_RESERVED; state.e.actual_burst = reserved_cnt; state.e.block = block; @@ -769,7 +823,8 @@ struct bbq_queue_state_s bbq_reserve_entry(struct bbq * q, struct bbq_block * bl } else { - // 如果不等于代表block.reserved被其他线程Reserved了 + // 已经被其他线程更新过了,当前数据为旧数据,需要重新获取 + cont++; continue; } } @@ -783,8 +838,10 @@ struct bbq_queue_state_s bbq_reserve_entry(struct bbq * q, struct bbq_block * bl bool consume_entry(struct bbq * q, struct bbq_entry_desc * e, void * deq_data, uint32_t data_type) { size_t idx = e->off * q->entry_size; + bool prod_single = q->prod_single; + bool cons_single = q->cons_single; - if (BBQ_F_CHK_VALUE(q->flags)) + if (BBQ_F_CHK_COPY_VALUE(q->flags)) { switch (data_type) { @@ -825,7 +882,7 @@ bool consume_entry(struct bbq * q, struct bbq_entry_desc * e, void * deq_data, u if (BBQ_F_CHK_DROP_OLD(q->flags)) { // TODO:优化,考虑allocated vsn溢出?考虑判断如果生产满了,直接移动head? - allocated = atomic_load(&e->block->allocated); + allocated = bbq_atomic64_load(&e->block->allocated, prod_single); // 预留的entry所在的块,已经被新生产的数据赶上了 if (bbq_cur_vsn(q, allocated) != e->vsn) { @@ -834,7 +891,7 @@ bool consume_entry(struct bbq * q, struct bbq_entry_desc * e, void * deq_data, u } else { - atomic_fetch_add(&e->block->consumed, e->actual_burst); + bbq_atomic64_fetch_add(&e->block->consumed, e->actual_burst, cons_single); } return true; @@ -844,17 +901,22 @@ bool advance_chead(struct bbq * q, uint64_t ch, uint64_t ver) { uint64_t ch_idx = bbq_head_idx(q, ch); struct bbq_block * n_blk = &(q->blocks[(ch_idx + 1) & q->idx_mask]); - + bool prod_single = q->prod_single; + bool cons_single = q->cons_single; uint64_t ch_vsn = bbq_head_vsn(q, ch); - uint64_t committed = atomic_load(&n_blk->committed); + uint64_t committed = bbq_atomic64_load(&n_blk->committed, prod_single); uint64_t committed_vsn = bbq_cur_vsn(q, committed); + if (BBQ_F_CHK_DROP_OLD(q->flags)) { // 通过检查下一个块的版本是否大于或等于当前块来保证 FIFO 顺序. // 第一个块是一个特殊情况,因为与其他块相比,它的版本总是相差一个。因此,如果 ch_idx == 0,我们在比较中加 1 if (committed_vsn < ver + (ch_idx == 0)) + { return false; - bbq_fetch_max(&n_blk->reserved, bbq_set_cur_vsn(q, committed_vsn)); + } + + bbq_fetch_max(&n_blk->reserved, bbq_set_cur_vsn(q, committed_vsn), cons_single); } else { @@ -864,11 +926,11 @@ bool advance_chead(struct bbq * q, uint64_t ch, uint64_t ver) return false; } uint64_t new_vsn = bbq_set_cur_vsn(q, ch_vsn + 1); - bbq_fetch_max(&n_blk->consumed, new_vsn); - bbq_fetch_max(&n_blk->reserved, new_vsn); + bbq_fetch_max(&n_blk->consumed, new_vsn, cons_single); + bbq_fetch_max(&n_blk->reserved, new_vsn, cons_single); } - bbq_fetch_max(&q->chead, ch + 1); + bbq_fetch_max(&q->chead.value, ch + 1, cons_single); return true; } @@ -876,6 +938,8 @@ bool advance_chead(struct bbq * q, uint64_t ch, uint64_t ver) static struct bbq_status __bbq_dequeue(struct bbq * q, void * deq_data, uint32_t n, uint32_t data_type, uint32_t * wait_consumed) { + uint64_t deq_update = 0; + bool cons_single = q->cons_single; struct bbq_status ret = {.status = 0, .actual_burst = 0}; if (q == NULL || deq_data == NULL) { @@ -886,7 +950,7 @@ static struct bbq_status __bbq_dequeue(struct bbq * q, void * deq_data, uint32_t while (true) { - uint64_t ch = atomic_load(&q->chead); + uint64_t ch = bbq_atomic64_load(&q->chead.value, cons_single); struct bbq_block * blk = &(q->blocks[bbq_head_idx(q, ch)]); struct bbq_queue_state_s state; state = bbq_reserve_entry(q, blk, n); @@ -900,6 +964,12 @@ static struct bbq_status __bbq_dequeue(struct bbq * q, void * deq_data, uint32_t } ret.status = BBQ_OK; ret.actual_burst = state.e.actual_burst; + + if (BBQ_F_CHK_STAT_ENABLE(q->flags)) + { + deq_update = + bbq_atomic64_fetch_add(&q->stat.n_deq, state.e.actual_burst, cons_single) + state.e.actual_burst; + } break; case BBQ_NO_ENTRY: bbq_errno = BBQ_ERR_EMPTY; @@ -923,9 +993,9 @@ static struct bbq_status __bbq_dequeue(struct bbq * q, void * deq_data, uint32_t break; } - if (wait_consumed != NULL) + if (BBQ_F_CHK_STAT_ENABLE(q->flags) && wait_consumed != NULL) { - *wait_consumed = bbq_wait_consumed_set(q, &ch, NULL, blk); + *wait_consumed = bbq_wait_consumed_get(q, 0, deq_update); } return ret; @@ -966,7 +1036,7 @@ static uint32_t bbq_dequeue_burst_one_dimensional(struct bbq * q, void * obj_tab return 0; } - if (!BBQ_F_CHK_VALUE(q->flags)) + if (!BBQ_F_CHK_COPY_VALUE(q->flags)) { bbq_errno = BBQ_ERR_NOT_SUPPORT; return 0; @@ -1031,7 +1101,7 @@ static uint32_t bbq_enqueue_burst_one_dimensional(struct bbq * q, void const * o return 0; } - if (!BBQ_F_CHK_VALUE(q->flags)) + if (!BBQ_F_CHK_COPY_VALUE(q->flags)) { bbq_errno = BBQ_ERR_NOT_SUPPORT; return 0; @@ -1087,45 +1157,6 @@ static uint32_t bbq_enqueue_burst_two_dimensional(struct bbq * q, void * const * return ready; } -bool bbq_empty(struct bbq * q) -{ - uint64_t phead = atomic_load(&q->phead); - uint64_t chead = atomic_load(&q->chead); - - uint64_t ph_vsn = bbq_head_vsn(q, phead); - uint64_t ch_vsn = bbq_head_vsn(q, chead); - uint64_t ph_idx = bbq_head_idx(q, phead); - uint64_t ch_idx = bbq_head_idx(q, chead); - - struct bbq_block * block; - - if (ph_idx != ch_idx) - { - return false; - } - - block = &q->blocks[ph_idx]; - if (ph_vsn == ch_vsn) - { - if (bbq_cur_off(q, atomic_load(&block->reserved)) == bbq_cur_off(q, atomic_load(&block->committed))) - { - return true; - } - } - - bbq_cursor reserved = atomic_load(&block->reserved); - uint64_t reserved_off = bbq_cur_off(q, reserved); - - if (BBQ_F_CHK_DROP_OLD(q->flags) && ph_vsn > ch_vsn && reserved_off != q->bs) - { - // 生产者追上了消费者,当前块以及未消费的全部 - // 如果reserved指向当前块的最后一个entry,可以移动head消费下一块,否则返回空 - return true; - } - - return false; -} - uint32_t bbq_enqueue_burst_elem(struct bbq * q, void const * obj_table, uint32_t n, uint32_t * wait_consumed) { bbq_errno = BBQ_OK; @@ -1160,30 +1191,21 @@ uint32_t bbq_dequeue_burst_elem(struct bbq * q, void * obj_table, uint32_t n, ui bool bbq_malloc_free_equal() { #ifdef BBQ_MEMORY - bool ret = true; - for (int i = 0; i < BBQ_MODULE_MAX; i++) + if (bbq_memory_g.malloc_cnt != bbq_memory_g.free_cnt) { - uint64_t malloc_cnt = atomic_load(&bbq_memory_g[i].malloc_cnt); - uint64_t free_cnt = atomic_load(&bbq_memory_g[i].free_cnt); - if (malloc_cnt != free_cnt) - { - BBQ_ERR_LOG("[module:%d] malloc:%lu free:%lu, bbq mmalloc-free count not equal\n", i, malloc_cnt, free_cnt); - ret = false; - } + BBQ_ERR_LOG("malloc:%lu free:%lu, bbq mmalloc-free count not equal\n", bbq_memory_g.malloc_cnt, + bbq_memory_g.free_cnt); + return false; + } - uint64_t malloc_size = atomic_load(&bbq_memory_g[i].malloc_size); - uint64_t free_size = atomic_load(&bbq_memory_g[i].free_size); - if (malloc_size != free_size) - { - BBQ_ERR_LOG("[module:%d] malloc:%lu byte free:%lu byte, bbq mmalloc-free size not equal\n", i, malloc_size, - free_size); - ret = false; - } + if (bbq_memory_g.malloc_size != bbq_memory_g.free_size) + { + BBQ_ERR_LOG("malloc:%lu byte free:%lu byte, bbq mmalloc-free size not equal\n", bbq_memory_g.malloc_size, + bbq_memory_g.free_size); + return false; } - return ret; -#else - return true; #endif + return true; } bool bbq_debug_check_array_bounds(struct bbq * q) @@ -1210,18 +1232,7 @@ bool bbq_debug_check_array_bounds(struct bbq * q) void bbq_debug_memory_print() { #ifdef BBQ_MEMORY - for (int i = 0; i < BBQ_MODULE_MAX; i++) - { - uint64_t malloc_cnt = atomic_load(&bbq_memory_g[i].malloc_cnt); - uint64_t free_cnt = atomic_load(&bbq_memory_g[i].free_cnt); - if (malloc_cnt == 0 && free_cnt == 0) - { - continue; - } - - printf("[%d]bbq malloc:%lu free:%lu\n", i, atomic_load(&bbq_memory_g[i].malloc_cnt), - atomic_load(&bbq_memory_g[i].free_cnt)); - } + printf("bbq malloc:%lu free:%lu\n", bbq_memory_g.malloc_cnt, bbq_memory_g.free_cnt); if (bbq_malloc_free_equal()) { @@ -1236,27 +1247,31 @@ void bbq_debug_memory_print() void bbq_debug_block_print(struct bbq * q, struct bbq_block * block) { - bbq_cursor allocated = atomic_load(&block->allocated); - bbq_cursor committed = atomic_load(&block->committed); - bbq_cursor reserved = atomic_load(&block->reserved); - bbq_cursor consumed = atomic_load(&block->consumed); - printf(" allocated:%lu committed:%lu reserved:%lu", bbq_cur_off(q, allocated), bbq_cur_off(q, committed), - bbq_cur_off(q, reserved)); + bool prod_single = q->prod_single; + bool cons_single = q->cons_single; + + uint64_t allocated = bbq_atomic64_load(&block->allocated, prod_single); + uint64_t committed = bbq_atomic64_load(&block->committed, prod_single); + uint64_t reserved = bbq_atomic64_load(&block->reserved, cons_single); + uint64_t consumed = bbq_atomic64_load(&block->consumed, cons_single); + printf(" allocated:%lu(ver:%lu) committed:%lu(ver:%lu) reserved:%lu(ver:%lu)", bbq_cur_off(q, allocated), + bbq_cur_vsn(q, allocated), bbq_cur_off(q, committed), bbq_cur_vsn(q, committed), bbq_cur_off(q, reserved), + bbq_cur_vsn(q, reserved)); if (BBQ_F_CHK_DROP_OLD(q->flags)) { printf("\n"); } else { - printf(" consumed:%lu\n", bbq_cur_off(q, consumed)); + printf(" consumed:%lu(ver:%lu)\n", bbq_cur_off(q, consumed), bbq_cur_vsn(q, consumed)); } } void bbq_debug_struct_print(struct bbq * q) { printf("-----bbq:%s-----\n", BBQ_F_CHK_DROP_OLD(q->flags) ? "drop old" : "retry new"); - uint64_t phead = atomic_load(&q->phead); - uint64_t chead = atomic_load(&q->chead); + uint64_t phead = bbq_atomic64_load(&q->phead.value, q->prod_single); + uint64_t chead = bbq_atomic64_load(&q->chead.value, q->cons_single); printf("block number:%u block size:%u total entries:%u\n", q->bn, q->bs, q->bn * q->bs); printf("producer header idx:%lu vsn:%lu\n", bbq_head_idx(q, phead), bbq_head_vsn(q, phead)); diff --git a/infra/src/vnode_common.c b/infra/src/vnode_common.c index b1643f9..43dbb72 100644 --- a/infra/src/vnode_common.c +++ b/infra/src/vnode_common.c @@ -31,6 +31,18 @@ #include #include +#ifdef BBQ_SPSC +void * bbq_malloc_callback(int32_t socket_id, size_t size) +{ + return ZMALLOC(size); +} + +void bbq_free_callback(void * ptr, size_t size) +{ + FREE(ptr); +} +#endif + /* What is a tunnel ? * tunnel is a fifo with fixed size. we use rte_ring as tunnel. * @@ -48,7 +60,8 @@ static struct tunnel_desc * tunnel_new(const char * symbol, unsigned int sz_excl MR_VERIFY_MALLOC(desc); #ifdef BBQ_SPSC - desc->tunnel_object = bbq_create(symbol, sz_exclusive + sz_shared, BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); + desc->tunnel_object = bbq_create(symbol, sz_exclusive + sz_shared, BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW, + bbq_malloc_callback, bbq_free_callback); #else desc->tunnel_object = rte_ring_create(symbol, sz_exclusive + sz_shared, SOCKET_ID_ANY, RING_F_SC_DEQ | RING_F_SP_ENQ); -- cgit v1.2.3