diff options
Diffstat (limited to 'bbq/src/bbq.c')
| -rw-r--r-- | bbq/src/bbq.c | 1077 |
1 files changed, 1077 insertions, 0 deletions
diff --git a/bbq/src/bbq.c b/bbq/src/bbq.c new file mode 100644 index 0000000..0ae6c5d --- /dev/null +++ b/bbq/src/bbq.c @@ -0,0 +1,1077 @@ +/* + * @Author: liuyu + * @LastEditTime: 2024-07-07 17:44:50 + * @Email: [email protected] + * @Describe: bbq(Block-based Bounded Queue)实现 + * 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf + */ +#include "bbq.h" +#include <math.h> +#include <string.h> + +// flags第1位控制入队时的数据拷贝策略,默认是"拷贝指针" +#define BBQ_F_COPY_PTR BBQ_F_DEFAULT /**< 默认为拷贝指针 */ +#define BBQ_F_COPY_VALUE 0x0001 /**< 创建队列时设置为拷贝数值 */ + +// 判断flags标记位 +#define BBQ_F_CHK_DROP_OLD(flags) (flags & BBQ_F_DROP_OLD) +#define BBQ_F_CHK_COPY_VALUE(flags) (flags & BBQ_F_COPY_VALUE) +#define BBQ_F_CHK_SP_ENQ(flags) (flags & BBQ_F_SP_ENQ) +#define BBQ_F_CHK_SC_DEQ(flags) (flags & BBQ_F_SC_DEQ) +#define BBQ_F_CHK_STAT_ENABLE(flags) (flags & BBQ_F_ENABLE_STAT) + +// 避免无用参数的编译告警 +#define AVOID_WARNING(param) ((void)param) + +// 在Debug编译时,BBQ_MEMORY宏定义开关被打开,将在每个块的末尾多分配一个entry,值为BBQ_MEM_MAGIC,方便排查越界问题。 +#ifdef BBQ_MEMORY +#define BBQ_MEM_MAGIC 0xFE +#endif + +#define bbq_likely(x) __builtin_expect(!!(x), 1) +#define bbq_unlikely(x) __builtin_expect(!!(x), 0) + +struct bbq_status { + int32_t status; // 返回状态 + uint32_t actual_burst; // 实际出/入队个数 +}; + +enum bbq_queue_state { + BBQ_SUCCESS = 0, + BBQ_BLOCK_DONE, // 当前块的entry已用完,需要移动到下一个块 + BBQ_NO_ENTRY, // 队列里没有entry可以使用了 + BBQ_NOT_AVAILABLE, // 当前块不可以用状态(将返回busy) + BBQ_ALLOCATED, // 已分配,返回entry信息 + BBQ_RESERVED, // 已保留,返回entry信息 +}; + +struct bbq_entry_desc { + uint64_t vsn; // allocated或reserved的版本(vsn) + uint64_t off; // entry在当前块的偏移(offset) + uint32_t actual_burst; // 实际出/入队个数 + struct bbq_block *block; // 指向所在的块 +}; + +struct bbq_queue_state_s { + enum bbq_queue_state state; // 队列状态 + union { + uint64_t vsn; // bbq_reserve_entry state==BLOCK_DONE时生效 + struct bbq_entry_desc e; // state为ALLOCATED、RESERVED生效 + }; +}; + +extern inline uint64_t bbq_head_idx(struct bbq *q, uint64_t x) { + return x & q->idx_mask; +} + +extern inline uint64_t bbq_head_vsn(struct bbq *q, uint64_t x) { + return x >> q->idx_bits; +} + +extern inline uint64_t bbq_cur_off(struct bbq *q, uint64_t x) { + return x & q->off_mask; +} + +extern inline uint64_t bbq_cur_vsn(struct bbq *q, uint64_t x) { + return x >> q->off_bits; +} + +static inline uint64_t bbq_set_cur_vsn(struct bbq *q, uint64_t ver) { + return ver << q->off_bits; +} + +extern inline uint64_t bbq_atomic64_load(union bbq_atomic64 *atomic, bool single) { + if (single) { + return atomic->s; + } else { + return atomic_load(&atomic->m); + } +} + +extern inline void bbq_atomic64_store(union bbq_atomic64 *atomic, uint64_t value, bool single) { + if (single) { + atomic->s = value; + } else { + atomic_store(&atomic->m, value); + } +} +static inline uint64_t bbq_atomic64_fetch_add(union bbq_atomic64 *atomic, uint64_t value, bool single) { + if (single) { + uint64_t old = atomic->s; + atomic->s += value; + return old; + } else { + return atomic_fetch_add(&atomic->m, value); + } +} + +/* 原子的比较两个值大小,并设置较大的值,成功则返回设置前的旧值 */ +uint64_t bbq_fetch_max(union bbq_atomic64 *atomic, uint64_t upd, bool single) { + uint64_t old_value = 0; + + if (single) { + old_value = atomic->s; + atomic->s = upd; + } else { + do { + old_value = atomic_load(&atomic->m); + } while (old_value < upd && !atomic_compare_exchange_weak(&atomic->m, &old_value, upd)); + } + + return old_value; +} + +/* 检查参数是否为2的N次幂 */ +bool bbq_check_power_of_two(uint32_t n) { + if (n == 0) { + return false; + } + + return (n & (n - 1)) == 0; +} + +/* 根据entries大小返回合理的block个数 + * 计算公式:log2(blocks) = max(1, ⌊log2(count)/4⌋) 注:⌊ ⌋代表向下取整。*/ +static uint32_t bbq_block_number_calc(uint32_t entries) { + double log_entries = log2((double)entries); + uint32_t over4 = (uint32_t)(log_entries / 4); // 向下取整 + uint32_t max_value = (over4 > 1) ? over4 : 1; + uint32_t n = pow(2, max_value); + + return n; +} + +/* 根据entries大小返回合理的bn、bs*/ +int bbq_bnbs_calc(uint32_t entries, uint32_t *bn, uint32_t *bs) { + if (bn == NULL || bs == NULL) { + return BBQ_ERR_INPUT_NULL; + } + + if (entries <= 1) { + return BBQ_ERR_OUT_OF_RANGE; + } + + if (bbq_check_power_of_two(entries) == false) { + return BBQ_ERR_POWER_OF_TWO; + } + + *bn = bbq_block_number_calc(entries); + *bs = entries / *bn; + + return BBQ_OK; +} + +void *bbq_malloc_from_pool(struct bbq *q, size_t size) { + if (q->memory_pool.size - q->memory_pool.off < size) { + return NULL; + } + + char *mem = q->memory_pool.ptr + q->memory_pool.off; + q->memory_pool.off += size; + + return mem; +} + +/* 块初始化 */ +int block_init(struct bbq *q, struct bbq_block *block, bool cursor_init) { + size_t size = 0; + +#ifdef BBQ_MEMORY + // 末尾多分配一个entry,它永远不应该被修改,以此检查是否存在写越界的问题 + size = (q->bs + 1) * q->entry_size; + block->entries = bbq_malloc_from_pool(q, size); + char *last_entry = block->entries + q->entry_size * q->bs; + memset(block->entries, 0, size); + memset(last_entry, BBQ_MEM_MAGIC, q->entry_size); +#else + size = q->bs * q->entry_size; + block->entries = bbq_malloc_from_pool(q, size); + memset(block->entries, 0, size); +#endif + + if (block->entries == NULL) { + return BBQ_ERR_ALLOC; + } + + if (cursor_init) { + // block数组里,除了第一块之外需要设置 + bbq_atomic64_store(&block->committed, q->bs, q->prod_single); + bbq_atomic64_store(&block->allocated, q->bs, q->prod_single); + bbq_atomic64_store(&block->reserved, q->bs, q->cons_single); + if (!BBQ_F_CHK_DROP_OLD(q->flags)) { + bbq_atomic64_store(&block->consumed, q->bs, q->cons_single); + } + } + + return BBQ_OK; +} + +/* 块清理函数,与block_init成对*/ +void block_destory(struct bbq *q, struct bbq_block *block) { + if (block->entries) { +#ifdef BBQ_MEMORY + q->memory_pool.free_f(block->entries, (q->bs + 1) * q->entry_size); +#else + q->memory_pool.free_f(block->entries, q->bs * q->entry_size); +#endif + block->entries = NULL; + } +} + +/* +求x在二进制表示中最高位1所在的位置,x参数不能为0。 +例如:x=1,return 0 (...1); x=3,return 1 (..11); x=9,return 3 (1..1) +*/ +static unsigned bbq_floor_log2(uint64_t x) { + return x == 1 ? 0 : 1 + bbq_floor_log2(x >> 1); +} + +/* +返回以2为底x的对数,并向上取整值。 +例如:x=1,return 0 (2^0=1); x=99, return 7(2^6=64 2^7=128) +*/ +static unsigned bbq_ceil_log2(uint64_t x) { + return x == 1 ? 0 : bbq_floor_log2(x - 1) + 1; +} + +/* 创建消息队列,bn和bs必须是2的N次幂,socket_id用于多numa分配内存 */ +static struct bbq *__bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs, + size_t obj_size, int socket_id, uint32_t flags, + bbq_malloc_f malloc_f, bbq_free_f free_f) { + int ret = 0; + size_t size = 0; + if (bbq_check_power_of_two(bn) == false) { + return NULL; + } + + if (bbq_check_power_of_two(bs) == false) { + return NULL; + } + + if (name == NULL) { + return NULL; + } + + if (strlen(name) >= BBQ_SYMBOL_MAX - 1 || obj_size == 0) { + return NULL; + } + + if (BBQ_F_CHK_DROP_OLD(flags) && BBQ_F_CHK_STAT_ENABLE(flags)) { + return NULL; + } + + if (malloc_f == NULL || free_f == NULL) { + return NULL; + } + + uint32_t all_size = 0; +#ifdef BBQ_MEMORY + all_size = sizeof(struct bbq) + bn * sizeof(struct bbq_block) + bn * (bs + 1) * obj_size; +#else + all_size = sizeof(struct bbq) + bn * sizeof(struct bbq_block) + bn * bs * obj_size; +#endif + struct bbq *q = malloc_f(socket_id, all_size); + if (q == NULL) { + return NULL; + } + + memset(q, 0, all_size); + q->memory_pool.size = all_size; + q->memory_pool.ptr = (char *)q; + q->memory_pool.off += sizeof(struct bbq); + q->memory_pool.malloc_f = malloc_f; + q->memory_pool.free_f = free_f; + q->bn = bn; + q->bs = bs; + q->entry_size = obj_size; + q->socket_id = socket_id; + strncpy(q->name, name, sizeof(q->name) - 1); + q->name[sizeof(q->name) - 1] = '\0'; + + if (BBQ_F_CHK_SP_ENQ(flags)) { + q->prod_single = true; + } + if (BBQ_F_CHK_SC_DEQ(flags)) { + q->cons_single = true; + } + q->flags = flags; + + size = bn * sizeof(*q->blocks); + q->blocks = bbq_malloc_from_pool(q, size); + if (q->blocks == NULL) { + goto error; + } + memset(q->blocks, 0, size); + + for (uint32_t i = 0; i < bn; ++i) { + ret = block_init(q, &(q->blocks[i]), (i == 0 ? false : true)); + if (ret != BBQ_OK) { + goto error; + } + } + + q->idx_bits = bbq_ceil_log2(bn); + uint32_t off_bits = bbq_ceil_log2(bs); + if (off_bits < 13) { + off_bits = 13; + } + q->off_bits = off_bits; // 多线程同时FAA,可能会超过最大索引,因此多分配一些空间,防止FAA溢出 TODO:多少合适?ver溢出问题? + + q->idx_mask = (1 << q->idx_bits) - 1; + q->off_mask = (1 << q->off_bits) - 1; + + return q; + +error: + bbq_destory(q); + return NULL; +} + +/* 使用自定义的bn、bs创建指针入队的bbq,一般用于单元测试 */ +struct bbq *bbq_create_with_bnbs(const char *name, uint32_t bn, uint32_t bs, + int socket_id, uint32_t flags, + bbq_malloc_f malloc_f, bbq_free_f free_f) { + return __bbq_create_bnbs(name, bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR, malloc_f, free_f); +} + +/* 使用自定义的bn、bs创建值入队的bbq,一般用于单元测试 */ +struct bbq *bbq_create_elem_with_bnbs(const char *name, uint32_t bn, uint32_t bs, + size_t obj_size, int socket_id, uint32_t flags, + bbq_malloc_f malloc_f, bbq_free_f free_f) { + return __bbq_create_bnbs(name, bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE, malloc_f, free_f); +} + +/* 创建消息队列,count必须大于1,且是2的N次幂,bn和bs将根据count值自动计算,socket_id用于多numa分配内存,free_func先设置NULL */ +struct bbq *bbq_create_elem(const char *name, uint32_t count, size_t obj_size, + int socket_id, uint32_t flags, + bbq_malloc_f malloc_f, bbq_free_f free_f) { + uint32_t bn = 0; + uint32_t bs = 0; + + if (bbq_bnbs_calc(count, &bn, &bs) != BBQ_OK) { + return NULL; + } + + return __bbq_create_bnbs(name, bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE, malloc_f, free_f); +} + +struct bbq *bbq_create(const char *name, uint32_t count, int socket_id, uint32_t flags, + bbq_malloc_f malloc_f, bbq_free_f free_f) { + uint32_t bn = 0; + uint32_t bs = 0; + + if (bbq_bnbs_calc(count, &bn, &bs) != BBQ_OK) { + return NULL; + } + + return __bbq_create_bnbs(name, bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR, malloc_f, free_f); +} + +/* 释放消息队列,与bbq_ring_create系列接口成对*/ +void bbq_destory(struct bbq *q) { + if (q == NULL) { + return; + } + + q->memory_pool.free_f(q->memory_pool.ptr, q->memory_pool.size); +} + +#define BBQ_DATA_TYPE_SINGLE 0x0 +#define BBQ_DATA_TYPE_1D_ARRAY 0x1 +#define BBQ_DATA_TYPE_2D_ARRAY 0x2 +void bbq_commit_entry(struct bbq *q, struct bbq_entry_desc *e, void const *data, uint32_t data_type) { + size_t idx = e->off * q->entry_size; + + if (BBQ_F_CHK_COPY_VALUE(q->flags)) { + // 数据入队列 + switch (data_type) { + case BBQ_DATA_TYPE_1D_ARRAY: + case BBQ_DATA_TYPE_SINGLE: + memcpy(&(e->block->entries[idx]), data, q->entry_size * e->actual_burst); + break; + case BBQ_DATA_TYPE_2D_ARRAY: { + void **tmp = (void **)data; + char *entry = &(e->block->entries[idx]); + for (size_t i = 0; i < e->actual_burst; i++) { + memcpy(entry, *tmp, q->entry_size); + entry += q->entry_size; + tmp++; + } + break; + } + default: + break; + } + } else { + // 指针入队列 + switch (data_type) { + case BBQ_DATA_TYPE_2D_ARRAY: + case BBQ_DATA_TYPE_SINGLE: + // 二维数组名等于首成员的地址,这里data其实是void **data, &data等同于 &(data[0]) + memcpy(&(e->block->entries[idx]), data, q->entry_size * e->actual_burst); + break; + case BBQ_DATA_TYPE_1D_ARRAY: + default: + break; + } + } + bbq_atomic64_fetch_add(&e->block->committed, e->actual_burst, q->prod_single); +} + +struct bbq_queue_state_s bbq_allocate_entry(struct bbq *q, uint64_t ph, uint32_t n) { + struct bbq_queue_state_s state = {0}; + uint64_t ph_idx = bbq_head_idx(q, ph); + bool prod_single = q->prod_single; + + struct bbq_block *block = &(q->blocks[ph_idx]); + if (bbq_cur_off(q, bbq_atomic64_load(&block->allocated, prod_single)) >= q->bs) { + state.state = BBQ_BLOCK_DONE; + return state; + } + + uint64_t old = bbq_atomic64_fetch_add(&block->allocated, n, prod_single); + uint64_t cur_vsn = bbq_cur_vsn(q, old); + uint64_t cur_off = bbq_cur_off(q, old); + + if (cur_off >= q->bs) { + state.state = BBQ_BLOCK_DONE; + return state; + } + + if (cur_off + n <= q->bs) { + // 可以全部入队 + state.e.actual_burst = n; + } else { + // 部分入队 + state.e.actual_burst = q->bs - cur_off; + } + state.e.block = block; + state.e.vsn = cur_vsn; + state.e.off = cur_off; + state.state = BBQ_ALLOCATED; + + return state; +} + +enum bbq_queue_state advance_phead(struct bbq *q, uint64_t ph) { + uint64_t cur = 0; + // 获取下一个block + struct bbq_block *n_blk = &(q->blocks[(bbq_head_idx(q, ph) + 1) & q->idx_mask]); + uint64_t ph_vsn = bbq_head_vsn(q, ph); + bool prod_single = q->prod_single; + bool cons_single = q->cons_single; + + if (BBQ_F_CHK_DROP_OLD(q->flags)) { + cur = bbq_atomic64_load(&n_blk->committed, prod_single); + // 生产者head避免覆盖上一轮尚未完全提交的区块 + if (bbq_cur_vsn(q, cur) == ph_vsn && bbq_cur_off(q, cur) != q->bs) { + return BBQ_NOT_AVAILABLE; + } + } else { + cur = bbq_atomic64_load(&n_blk->consumed, cons_single); + uint64_t reserved; + uint64_t consumed_off = bbq_cur_off(q, cur); + uint64_t consumed_vsn = bbq_cur_vsn(q, cur); + + if (consumed_vsn < ph_vsn || + (consumed_vsn == ph_vsn && consumed_off != q->bs)) { + // 生产者赶上了消费者 + reserved = bbq_atomic64_load(&n_blk->reserved, cons_single); + if (bbq_cur_off(q, reserved) == consumed_off) { + return BBQ_NO_ENTRY; + } else { + return BBQ_NOT_AVAILABLE; + } + } + } + + // 用head的version值初始化下一个块,version在高位,version+1,index或offset也会被清零 + uint64_t new_vsn = bbq_set_cur_vsn(q, ph_vsn + 1); + // 其他线程完成了head更新,当前bbq_fetch_max不会再更新,可能在以下两种情况: + // 1)实际ph_vsn与本次要更新的ph_vsn相同。 + // 2)当前ph_vsn已经落后于实际的ph_vsn(且移动到了下一轮), + bbq_fetch_max(&n_blk->committed, new_vsn, prod_single); + bbq_fetch_max(&n_blk->allocated, new_vsn, prod_single); + + // ph+1,当超过索引范围,进入下一轮时,version会自动+1 + bbq_fetch_max(&q->phead.value, ph + 1, prod_single); + return BBQ_SUCCESS; +} + +bool bbq_empty(struct bbq *q) { + return bbq_atomic64_load(&q->stat.n_enq, q->prod_single) == bbq_atomic64_load(&q->stat.n_deq, q->cons_single); +} + +static uint32_t bbq_wait_consumed_get(struct bbq *q, uint64_t enq_update, uint64_t deq_update) { + uint64_t enq_now = 0; + uint64_t deq_now = 0; + + if (enq_update == 0) { + enq_now = bbq_atomic64_load(&q->stat.n_enq, q->prod_single); + } else { + enq_now = enq_update; + } + + if (deq_update == 0) { + deq_now = bbq_atomic64_load(&q->stat.n_deq, q->cons_single); + } else { + deq_now = deq_update; + } + + return enq_now - deq_now; +} + +/* 消息队列入队 */ +static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, + uint32_t n, uint32_t data_type, + uint32_t *wait_consumed) { + uint64_t enq_update = 0; + struct bbq_status ret = {.status = 0, .actual_burst = 0}; + bool prod_single = q->prod_single; + + if (bbq_unlikely(q == NULL || data == NULL)) { + ret.status = BBQ_ERR_INPUT_NULL; + return ret; + } + + while (true) { + uint64_t ph = bbq_atomic64_load(&q->phead.value, prod_single); + struct bbq_queue_state_s state = bbq_allocate_entry(q, ph, n); + + switch (state.state) { + case BBQ_ALLOCATED: + bbq_commit_entry(q, &state.e, data, data_type); + ret.actual_burst = state.e.actual_burst; + ret.status = BBQ_OK; + + if (BBQ_F_CHK_STAT_ENABLE(q->flags)) { + enq_update = bbq_atomic64_fetch_add(&q->stat.n_enq, state.e.actual_burst, prod_single) + + state.e.actual_burst; + } + break; + case BBQ_BLOCK_DONE: { + enum bbq_queue_state pstate = advance_phead(q, ph); + if (pstate == BBQ_SUCCESS) { + continue; + } + + if (pstate == BBQ_NO_ENTRY) { + ret.status = BBQ_ERR_FULL; + } else if (pstate == BBQ_NOT_AVAILABLE) { + ret.status = BBQ_ERR_BUSY; + } else { + ret.status = BBQ_ERR; + } + + break; + } + default: + ret.status = BBQ_ERR; + break; + } + + if (BBQ_F_CHK_STAT_ENABLE(q->flags) && wait_consumed != NULL) { + *wait_consumed = bbq_wait_consumed_get(q, enq_update, 0); + } + + return ret; + } +} + +int bbq_enqueue(struct bbq *q, void *const *data) { + struct bbq_status ret = __bbq_enqueue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL); + return ret.status; +} + +int bbq_enqueue_elem(struct bbq *q, void const *data) { + struct bbq_status ret = __bbq_enqueue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL); + return ret.status; +} + +/* 更新reserved,成功则返回更新的个数 */ +uint32_t bbq_reserve_update(union bbq_atomic64 *atomic, uint64_t reserved, uint32_t n, bool single) { + if (single) { + atomic->s += n; + return n; + } else { + // 不能用fetch_max,比如a线程burst2,b线程burst3,a更新成功,b也更新成功。 + bool ret = atomic_compare_exchange_weak(&atomic->m, &reserved, reserved + n); + return ret == true ? n : 0; + } +} + +struct bbq_queue_state_s bbq_reserve_entry(struct bbq *q, struct bbq_block *block, uint32_t n) { + uint32_t cont = 0; + bool prod_single = q->prod_single; + bool cons_single = q->cons_single; + + while (true) { + struct bbq_queue_state_s state; + uint64_t reserved = bbq_atomic64_load(&block->reserved, cons_single); + uint64_t reserved_off = bbq_cur_off(q, reserved); + uint64_t reserved_svn = bbq_cur_vsn(q, reserved); + + if (reserved_off < q->bs) { + uint64_t committed = bbq_atomic64_load(&block->committed, prod_single); + uint64_t committed_off = bbq_cur_off(q, committed); + if (committed_off == reserved_off) { + state.state = BBQ_NO_ENTRY; + return state; + } + + // 当前块的数据没有被全部commited,需要通过判断allocated和committed来判断是否存在正在入队进行中的数据 + if (committed_off != q->bs) { + uint64_t allocated = bbq_atomic64_load(&block->allocated, prod_single); + if (bbq_cur_off(q, allocated) != committed_off) { + state.state = BBQ_NOT_AVAILABLE; + return state; + } + } + + uint32_t tmp = committed_off - reserved_off; + uint32_t reserved_cnt = bbq_reserve_update(&block->reserved, reserved, tmp < n ? tmp : n, q->cons_single); + if (reserved_cnt > 0) { + state.state = BBQ_RESERVED; + state.e.actual_burst = reserved_cnt; + state.e.block = block; + state.e.off = reserved_off; + state.e.vsn = reserved_svn; + + return state; + } else { + // 已经被其他线程更新过了,当前数据为旧数据,需要重新获取 + cont++; + continue; + } + } + + state.state = BBQ_BLOCK_DONE; + state.vsn = reserved_svn; + return state; + } +} + +bool consume_entry(struct bbq *q, struct bbq_entry_desc *e, void *deq_data, uint32_t data_type) { + size_t idx = e->off * q->entry_size; + bool prod_single = q->prod_single; + bool cons_single = q->cons_single; + + if (BBQ_F_CHK_COPY_VALUE(q->flags)) { + switch (data_type) { + case BBQ_DATA_TYPE_1D_ARRAY: + case BBQ_DATA_TYPE_SINGLE: + memcpy(deq_data, &(e->block->entries[idx]), q->entry_size * e->actual_burst); + break; + case BBQ_DATA_TYPE_2D_ARRAY: { + void **tmp = (void **)deq_data; + char *entry = &(e->block->entries[idx]); + for (size_t i = 0; i < e->actual_burst; i++) { + memcpy(*tmp, entry, q->entry_size); + entry += q->entry_size; + tmp++; + } + break; + } + default: + break; + } + } else { + switch (data_type) { + case BBQ_DATA_TYPE_2D_ARRAY: + case BBQ_DATA_TYPE_SINGLE: + memcpy(deq_data, &(e->block->entries[idx]), q->entry_size * e->actual_burst); + break; + case BBQ_DATA_TYPE_1D_ARRAY: + default: + break; + } + } + + uint64_t allocated; + if (BBQ_F_CHK_DROP_OLD(q->flags)) { + // TODO:优化,考虑allocated vsn溢出?考虑判断如果生产满了,直接移动head? + allocated = bbq_atomic64_load(&e->block->allocated, prod_single); + // 预留的entry所在的块,已经被新生产的数据赶上了 + if (bbq_cur_vsn(q, allocated) != e->vsn) { + return false; + } + } else { + bbq_atomic64_fetch_add(&e->block->consumed, e->actual_burst, cons_single); + } + + return true; +} + +bool advance_chead(struct bbq *q, uint64_t ch, uint64_t ver) { + uint64_t ch_idx = bbq_head_idx(q, ch); + struct bbq_block *n_blk = &(q->blocks[(ch_idx + 1) & q->idx_mask]); + bool prod_single = q->prod_single; + bool cons_single = q->cons_single; + uint64_t ch_vsn = bbq_head_vsn(q, ch); + uint64_t committed = bbq_atomic64_load(&n_blk->committed, prod_single); + uint64_t committed_vsn = bbq_cur_vsn(q, committed); + + if (BBQ_F_CHK_DROP_OLD(q->flags)) { + // 通过检查下一个块的版本是否大于或等于当前块来保证 FIFO 顺序. + // 第一个块是一个特殊情况,因为与其他块相比,它的版本总是相差一个。因此,如果 ch_idx == 0,我们在比较中加 1 + if (committed_vsn < ver + (ch_idx == 0)) { + return false; + } + + bbq_fetch_max(&n_blk->reserved, bbq_set_cur_vsn(q, committed_vsn), cons_single); + } else { + if (committed_vsn != ch_vsn + 1) { + // 消费者追上了生产者,下一块还未开始生产 + return false; + } + uint64_t new_vsn = bbq_set_cur_vsn(q, ch_vsn + 1); + bbq_fetch_max(&n_blk->consumed, new_vsn, cons_single); + bbq_fetch_max(&n_blk->reserved, new_vsn, cons_single); + } + + bbq_fetch_max(&q->chead.value, ch + 1, cons_single); + return true; +} + +/* 消息队列出队 */ +static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n, uint32_t data_type, uint32_t *wait_consumed) { + uint64_t deq_update = 0; + bool cons_single = q->cons_single; + struct bbq_status ret = {.status = 0, .actual_burst = 0}; + if (bbq_unlikely(q == NULL || deq_data == NULL)) { + ret.status = BBQ_ERR_INPUT_NULL; + return ret; + } + + while (true) { + uint64_t ch = bbq_atomic64_load(&q->chead.value, cons_single); + struct bbq_block *blk = &(q->blocks[bbq_head_idx(q, ch)]); + struct bbq_queue_state_s state; + state = bbq_reserve_entry(q, blk, n); + + switch (state.state) { + case BBQ_RESERVED: + if (!consume_entry(q, &state.e, deq_data, data_type)) { + continue; + } + ret.status = BBQ_OK; + ret.actual_burst = state.e.actual_burst; + + if (BBQ_F_CHK_STAT_ENABLE(q->flags)) { + deq_update = bbq_atomic64_fetch_add(&q->stat.n_deq, state.e.actual_burst, cons_single) + + state.e.actual_burst; + } + break; + case BBQ_NO_ENTRY: + ret.status = BBQ_ERR_EMPTY; + break; + case BBQ_NOT_AVAILABLE: + ret.status = BBQ_ERR_BUSY; + break; + case BBQ_BLOCK_DONE: + if (advance_chead(q, ch, state.vsn)) { + continue; + } + ret.status = BBQ_ERR_EMPTY; + break; + default: + ret.status = BBQ_ERR; + break; + } + + if (BBQ_F_CHK_STAT_ENABLE(q->flags) && wait_consumed != NULL) { + *wait_consumed = bbq_wait_consumed_get(q, 0, deq_update); + } + + return ret; + } +} + +int bbq_dequeue(struct bbq *q, void **data) { + struct bbq_status ret = __bbq_dequeue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL); + return ret.status; +} + +int bbq_dequeue_elem(struct bbq *q, void *data) { + struct bbq_status ret = __bbq_dequeue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL); + return ret.status; +} + +static inline uint32_t bbq_max_burst(struct bbq *q, uint32_t n) { + if (bbq_unlikely(n > q->bs)) { + return q->bs; + } + + return n; +} + +static uint32_t bbq_dequeue_burst_1d_array(struct bbq *q, void *obj_table, uint32_t n, uint32_t *wait_consumed) { + if (bbq_unlikely(q == NULL || obj_table == NULL)) { + return 0; + } + + if (bbq_unlikely(!BBQ_F_CHK_COPY_VALUE(q->flags))) { + return 0; + } + + uint32_t burst = 0; + uint32_t ready = 0; + void *obj = obj_table; + struct bbq_status ret = {0}; + + while (ready < n) { + burst = bbq_max_burst(q, n - ready); + ret = __bbq_dequeue(q, obj, burst, BBQ_DATA_TYPE_1D_ARRAY, wait_consumed); + if (ret.status != BBQ_OK) { + break; + } + obj += q->entry_size * ret.actual_burst; + ready += ret.actual_burst; + } + + return ready; +} + +static uint32_t bbq_dequeue_burst_2d_array(struct bbq *q, void **obj_table, uint32_t n, uint32_t *wait_consumed) { + if (bbq_unlikely(q == NULL || obj_table == NULL)) { + return 0; + } + + uint32_t burst = 0; + uint32_t ready = 0; + void **obj_table_tmp = obj_table; + struct bbq_status ret = {0}; + + while (ready < n) { + burst = bbq_max_burst(q, n - ready); + ret = __bbq_dequeue(q, obj_table_tmp, burst, BBQ_DATA_TYPE_2D_ARRAY, wait_consumed); + if (ret.status != BBQ_OK) { + break; + } + obj_table_tmp += ret.actual_burst; + ready += ret.actual_burst; + } + + return ready; +} + +/* 尝试一次入队多个数据,直到达到最大数量,或是入队失败 */ +static uint32_t bbq_enqueue_burst_1d_array(struct bbq *q, void const *obj_table, uint32_t n, uint32_t *wait_consumed) { + if (bbq_unlikely(q == NULL || obj_table == NULL)) { + return 0; + } + + if (bbq_unlikely(!BBQ_F_CHK_COPY_VALUE(q->flags))) { + return 0; + } + + uint32_t burst = 0; + uint32_t ready = 0; + void const *obj = obj_table; + struct bbq_status ret = {0}; + + while (ready < n) { + burst = bbq_max_burst(q, n - ready); + ret = __bbq_enqueue(q, obj, burst, BBQ_DATA_TYPE_1D_ARRAY, wait_consumed); + if (ret.status != BBQ_OK) { + break; + } + obj += q->entry_size * ret.actual_burst; + ready += ret.actual_burst; + } + + return ready; +} + +/* 尝试一次入队多个数据,直到达到最大数量,或是入队失败 */ +static uint32_t bbq_enqueue_burst_2d_array(struct bbq *q, void *const *obj_table, uint32_t n, uint32_t *wait_consumed) { + if (bbq_unlikely(q == NULL || obj_table == NULL)) { + return 0; + } + + uint32_t burst = 0; + uint32_t ready = 0; + void *const *obj_table_tmp = obj_table; + struct bbq_status ret = {0}; + + while (ready < n) { + burst = bbq_max_burst(q, n - ready); + ret = __bbq_enqueue(q, obj_table_tmp, burst, BBQ_DATA_TYPE_2D_ARRAY, wait_consumed); + if (ret.status != BBQ_OK) { + break; + } + obj_table_tmp += ret.actual_burst; + ready += ret.actual_burst; + } + + return ready; +} + +uint32_t bbq_enqueue_burst_elem(struct bbq *q, void const *obj_table, uint32_t n, uint32_t *wait_consumed) { + return bbq_enqueue_burst_1d_array(q, obj_table, n, wait_consumed); +} + +uint32_t bbq_enqueue_burst_elem_2d_array(struct bbq *q, void *const *obj_table, uint32_t n, uint32_t *wait_consumed) { + return bbq_enqueue_burst_2d_array(q, obj_table, n, wait_consumed); +} + +uint32_t bbq_enqueue_burst(struct bbq *q, void *const *obj_table, uint32_t n, uint32_t *wait_consumed) { + return bbq_enqueue_burst_2d_array(q, obj_table, n, wait_consumed); +} + +uint32_t bbq_dequeue_burst(struct bbq *q, void **obj_table, uint32_t n, uint32_t *wait_consumed) { + return bbq_dequeue_burst_2d_array(q, obj_table, n, wait_consumed); +} + +uint32_t bbq_dequeue_burst_elem(struct bbq *q, void *obj_table, uint32_t n, uint32_t *wait_consumed) { + return bbq_dequeue_burst_1d_array(q, obj_table, n, wait_consumed); +} + +bool bbq_debug_check_array_bounds(struct bbq *q) { +#ifdef BBQ_MEMORY + void *value = malloc(q->entry_size); + memset(value, BBQ_MEM_MAGIC, q->entry_size); + + for (size_t i = 0; i < q->bn; i++) { + // 针对内存检查版本,申请了bs+1个entry + char *last_entry = q->blocks[i].entries + q->bs * q->entry_size; + if (memcmp(last_entry, value, q->entry_size) != 0) { + return false; + } + } +#else + AVOID_WARNING(q); +#endif + return true; +} + +#if 0 +#include <stdio.h> +/* 位置有关代码,需要-fPIC,这里注释掉用于调试 */ +void bbq_debug_block_print(struct bbq *q, struct bbq_block *block) { + bool prod_single = q->prod_single; + bool cons_single = q->cons_single; + + uint64_t allocated = bbq_atomic64_load(&block->allocated, prod_single); + uint64_t committed = bbq_atomic64_load(&block->committed, prod_single); + uint64_t reserved = bbq_atomic64_load(&block->reserved, cons_single); + uint64_t consumed = bbq_atomic64_load(&block->consumed, cons_single); + printf(" allocated:%lu(ver:%lu) committed:%lu(ver:%lu) reserved:%lu(ver:%lu)", + bbq_cur_off(q, allocated), bbq_cur_vsn(q, allocated), + bbq_cur_off(q, committed), bbq_cur_vsn(q, committed), + bbq_cur_off(q, reserved), bbq_cur_vsn(q, reserved)); + if (BBQ_F_CHK_DROP_OLD(q->flags)) { + printf("\n"); + } else { + printf(" consumed:%lu(ver:%lu)\n", bbq_cur_off(q, consumed), bbq_cur_vsn(q, consumed)); + } +} + +void bbq_debug_struct_print(struct bbq *q) { + printf("-----bbq:%s-----\n", BBQ_F_CHK_DROP_OLD(q->flags) ? "drop old" : "retry new"); + uint64_t phead = bbq_atomic64_load(&q->phead.value, q->prod_single); + uint64_t chead = bbq_atomic64_load(&q->chead.value, q->cons_single); + + printf("block number:%u block size:%u total entries:%u\n", q->bn, q->bs, q->bn * q->bs); + printf("producer header idx:%lu vsn:%lu\n", bbq_head_idx(q, phead), bbq_head_vsn(q, phead)); + + uint64_t ph_idx = bbq_head_idx(q, phead); + uint64_t ch_idx = bbq_head_idx(q, chead); + if (ph_idx != ch_idx) { + printf("block[%lu]\n", ph_idx); + bbq_debug_block_print(q, &(q->blocks[ph_idx])); + } + + printf("consumer header idx:%lu vsn:%lu\n", bbq_head_idx(q, chead), bbq_head_vsn(q, chead)); + printf("block[%lu]\n", ch_idx); + bbq_debug_block_print(q, &(q->blocks[ch_idx])); +} +#endif + +#if 0 +/* 根据实际head以及块上的游标推算出待消费的个数,该函数很影响性能 */ +static uint32_t bbq_wait_consumed_get_from_head(struct bbq *q, + uint64_t *ch_ptr, + uint64_t *ph_ptr, + struct bbq_block *blk_ph) { + uint64_t ch = 0; + uint64_t ph = 0; + if (ch_ptr != NULL) { + ch = *ch_ptr; + } else { + ch = bbq_atomic64_load(&q->chead.value); + } + + if (ph_ptr != NULL) { + ph = *ph_ptr; + } else { + ph = bbq_atomic64_load(&q->phead.value); + } + + uint64_t ph_idx = bbq_head_idx(q, ph); + uint64_t ch_idx = bbq_head_idx(q, ch); + uint64_t committed_off = bbq_cur_off(q, bbq_atomic64_load(&blk_ph->committed)); + + struct bbq_block *blk_ch = &(q->blocks[bbq_head_idx(q, ch)]); + uint64_t reserved_off = bbq_cur_off(q, bbq_atomic64_load(&blk_ch->reserved)); + + // "生产者"超过"消费者"块的个数 + uint64_t idx_diff = ph_idx >= ch_idx ? ph_idx - ch_idx : q->bn - ch_idx + ph_idx; + if (!BBQ_F_CHK_DROP_OLD(q->flags)) { + // 这里idx_diff-1=-1也是正确。 + return (idx_diff - 1) * q->bs + (q->bs - reserved_off + committed_off); + } + + uint64_t ch_vsn = bbq_head_vsn(q, ch); + uint64_t ph_vsn = bbq_head_vsn(q, ph); + + if (ph_vsn == ch_vsn || (ph_vsn == (ch_vsn + 1) && (ph_idx < ch_idx))) { + // drop old模式,未发生覆盖 + return (idx_diff - 1) * q->bs + (q->bs - reserved_off + committed_off); + } + + if (ph_idx == ch_idx) { + // drop old模式,发生了覆盖,当前块以及之前已生产的都作废 + return 0; + } + + return (idx_diff - 1) * q->bs + committed_off; +} + +/* 根据head偏移判断是否为空,耗性能函数 */ +bool bbq_empty_from_head(struct bbq *q) { + uint64_t phead = bbq_atomic64_load(&q->phead.value); + uint64_t chead = bbq_atomic64_load(&q->chead.value); + + uint64_t ph_vsn = bbq_head_vsn(q, phead); + uint64_t ch_vsn = bbq_head_vsn(q, chead); + uint64_t ph_idx = bbq_head_idx(q, phead); + uint64_t ch_idx = bbq_head_idx(q, chead); + + struct bbq_block *block; + + if (ph_idx != ch_idx) { + return false; + } + + block = &q->blocks[ph_idx]; + if (ph_vsn == ch_vsn) { + if (bbq_cur_off(q, bbq_atomic64_load(&block->reserved)) == bbq_cur_off(q, bbq_atomic64_load(&block->committed))) { + return true; + } + } + + uint64_t reserved = bbq_atomic64_load(&block->reserved); + uint64_t reserved_off = bbq_cur_off(q, reserved); + + if (BBQ_F_CHK_DROP_OLD(q->flags) && + ph_vsn > ch_vsn && + reserved_off != q->bs) { + // 生产者追上了消费者,当前块以及未消费的全部 + // 如果reserved指向当前块的最后一个entry,可以移动head消费下一块,否则返回空 + return true; + } + + return false; +} +#endif
\ No newline at end of file |
