diff options
| author | liuyu <[email protected]> | 2024-07-02 07:58:09 -0400 |
|---|---|---|
| committer | liuyu <[email protected]> | 2024-07-02 07:58:09 -0400 |
| commit | 88f8c871bf527082df5e322d6778f1402de7f0fd (patch) | |
| tree | a3f14cd704d0f0ee6da0836f821f5db84656bb79 | |
| parent | 462c024c8abb494efa0491af1a5cff559be06453 (diff) | |
临时提交,bbq_atomic64里的single标识移出来
| -rw-r--r-- | bbq/include/bbq.h | 15 | ||||
| -rw-r--r-- | bbq/src/bbq.c | 135 |
2 files changed, 79 insertions, 71 deletions
diff --git a/bbq/include/bbq.h b/bbq/include/bbq.h index ec102e5..2d1561f 100644 --- a/bbq/include/bbq.h +++ b/bbq/include/bbq.h @@ -1,6 +1,6 @@ /* * @Author: [email protected] - * @LastEditTime: 2024-07-02 11:12:50 + * @LastEditTime: 2024-07-02 07:57:19 * @Describe: bbq(Block-based Bounded Queue)头文件 * 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf */ @@ -26,7 +26,6 @@ using aotmic_uint64 = std::atomic<uint64_t>; #define __BBQ_CACHE_ALIGNED __attribute__((__aligned__(BBQ_CACHE_LINE))) struct bbq_atomic64 { - bool single; // 如果为单生产者或单消费者,则single为true union { volatile uint64_t s; // single使用该字段 aotmic_uint64 m; @@ -39,10 +38,10 @@ struct bbq_head { } __BBQ_CACHE_ALIGNED; struct bbq_block { - struct bbq_atomic64 committed; // 已提交(version|offset) - struct bbq_atomic64 allocated; // 已分配(version|offset) - struct bbq_atomic64 reserved; // 已预留(version|offset) - struct bbq_atomic64 consumed; // 已消费(version|offset)注:在drop-old模式下没用到 + struct bbq_atomic64 committed; // 生产者,已提交(version|offset) + struct bbq_atomic64 allocated; // 生产者,已分配(version|offset) + struct bbq_atomic64 reserved; // 消费者,已预留(version|offset) + struct bbq_atomic64 consumed; // 消费者,已消费(version|offset)注:在drop-old模式下没用到 char *entries __BBQ_CACHE_ALIGNED; // 存储大小可变的entry,每个块分配空间:bs * entry_size } __BBQ_CACHE_ALIGNED; @@ -69,8 +68,8 @@ struct bbq { uint64_t idx_mask; // idx_bits偏移后的掩码 uint64_t off_mask; // off_bits偏移后的掩码 uint64_t entry_size; // blocks.entries里每个entry的大小 - uint64_t pad0; - uint64_t pad1; + bool prod_single; // 如果为单生产者或单消费者,则single为true + bool cons_single; // 如果为单生产者或单消费者,则single为true struct bbq_head phead; // 生产者头,指向块的索引,分为两部分:version|idx struct bbq_head chead; // 消费者头,指向块的索引,分为两部分:version|idx diff --git a/bbq/src/bbq.c b/bbq/src/bbq.c index 3c14bb8..2b72265 100644 --- a/bbq/src/bbq.c +++ b/bbq/src/bbq.c @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-07-02 07:15:00 + * @LastEditTime: 2024-07-02 11:47:55 * @Email: [email protected] * @Describe: bbq(Block-based Bounded Queue)实现 * 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf @@ -81,23 +81,23 @@ static inline uint64_t bbq_set_cur_vsn(struct bbq *q, uint64_t ver) { return ver << q->off_bits; } -uint64_t bbq_atomic64_load(struct bbq_atomic64 *atomic) { - if (atomic->single) { +uint64_t bbq_atomic64_load(struct bbq_atomic64 *atomic, bool single) { + if (single) { return atomic->s; } else { return atomic_load(&atomic->m); } } -void bbq_atomic64_store(struct bbq_atomic64 *atomic, uint64_t value) { - if (atomic->single) { +void bbq_atomic64_store(struct bbq_atomic64 *atomic, uint64_t value, bool single) { + if (single) { atomic->s = value; } else { atomic_store(&atomic->m, value); } } -static inline uint64_t bbq_atomic64_fetch_add(struct bbq_atomic64 *atomic, uint64_t value) { - if (atomic->single) { +static inline uint64_t bbq_atomic64_fetch_add(struct bbq_atomic64 *atomic, uint64_t value, bool single) { + if (single) { uint64_t old = atomic->s; atomic->s += value; return old; @@ -139,10 +139,10 @@ void bbq_free_def_callback(void *ptr, } /* 原子的比较两个值大小,并设置较大的值,成功则返回设置前的旧值 */ -uint64_t bbq_fetch_max(struct bbq_atomic64 *atomic, uint64_t upd) { +uint64_t bbq_fetch_max(struct bbq_atomic64 *atomic, uint64_t upd, bool single) { uint64_t old_value = 0; - if (atomic->single) { + if (single) { old_value = atomic->s; atomic->s = upd; } else { @@ -228,23 +228,13 @@ int block_init(struct bbq *q, struct bbq_block *block, bool cursor_init, uint32_ return bbq_errno; } - if (BBQ_F_CHK_SP_ENQ(flags)) { - block->allocated.single = true; - block->committed.single = true; - } - - if (BBQ_F_CHK_SC_DEQ(flags)) { - block->reserved.single = true; - block->consumed.single = true; - } - if (cursor_init) { // block数组里,除了第一块之外需要设置 - bbq_atomic64_store(&block->committed, q->bs); - bbq_atomic64_store(&block->allocated, q->bs); - bbq_atomic64_store(&block->reserved, q->bs); + bbq_atomic64_store(&block->committed, q->bs, q->prod_single); + bbq_atomic64_store(&block->allocated, q->bs, q->prod_single); + bbq_atomic64_store(&block->reserved, q->bs, q->cons_single); if (!BBQ_F_CHK_DROP_OLD(q->flags)) { - bbq_atomic64_store(&block->consumed, q->bs); + bbq_atomic64_store(&block->consumed, q->bs, q->cons_single); } } @@ -348,10 +338,10 @@ static struct bbq *__bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs, q->entry_size = obj_size; q->socket_id = socket_id; if (BBQ_F_CHK_SP_ENQ(flags)) { - q->phead.value.single = true; + q->prod_single = true; } if (BBQ_F_CHK_SC_DEQ(flags)) { - q->chead.value.single = true; + q->cons_single = true; } q->flags = flags; @@ -481,20 +471,21 @@ void bbq_commit_entry(struct bbq *q, struct bbq_entry_desc *e, void const *data, break; } } - bbq_atomic64_fetch_add(&e->block->committed, e->actual_burst); + bbq_atomic64_fetch_add(&e->block->committed, e->actual_burst, q->prod_single); } struct bbq_queue_state_s bbq_allocate_entry(struct bbq *q, uint64_t ph, uint32_t n) { struct bbq_queue_state_s state = {0}; uint64_t ph_idx = bbq_head_idx(q, ph); + bool prod_single = q->prod_single; struct bbq_block *block = &(q->blocks[ph_idx]); - if (bbq_cur_off(q, bbq_atomic64_load(&block->allocated)) >= q->bs) { + if (bbq_cur_off(q, bbq_atomic64_load(&block->allocated, prod_single)) >= q->bs) { state.state = BBQ_BLOCK_DONE; return state; } - uint64_t old = bbq_atomic64_fetch_add(&block->allocated, n); + uint64_t old = bbq_atomic64_fetch_add(&block->allocated, n, prod_single); uint64_t cur_vsn = bbq_cur_vsn(q, old); uint64_t cur_off = bbq_cur_off(q, old); @@ -523,15 +514,17 @@ enum bbq_queue_state advance_phead(struct bbq *q, uint64_t ph) { // 获取下一个block struct bbq_block *n_blk = &(q->blocks[(bbq_head_idx(q, ph) + 1) & q->idx_mask]); uint64_t ph_vsn = bbq_head_vsn(q, ph); + bool prod_single = q->prod_single; + bool cons_single = q->cons_single; if (BBQ_F_CHK_DROP_OLD(q->flags)) { - cur = bbq_atomic64_load(&n_blk->committed); + cur = bbq_atomic64_load(&n_blk->committed, prod_single); // 生产者head避免覆盖上一轮尚未完全提交的区块 if (bbq_cur_vsn(q, cur) == ph_vsn && bbq_cur_off(q, cur) != q->bs) { return BBQ_NOT_AVAILABLE; } } else { - cur = bbq_atomic64_load(&n_blk->consumed); + cur = bbq_atomic64_load(&n_blk->consumed, cons_single); uint64_t reserved; uint64_t consumed_off = bbq_cur_off(q, cur); uint64_t consumed_vsn = bbq_cur_vsn(q, cur); @@ -539,7 +532,7 @@ enum bbq_queue_state advance_phead(struct bbq *q, uint64_t ph) { if (consumed_vsn < ph_vsn || (consumed_vsn == ph_vsn && consumed_off != q->bs)) { // 生产者赶上了消费者 - reserved = bbq_atomic64_load(&n_blk->reserved); + reserved = bbq_atomic64_load(&n_blk->reserved, cons_single); if (bbq_cur_off(q, reserved) == consumed_off) { return BBQ_NO_ENTRY; } else { @@ -553,21 +546,22 @@ enum bbq_queue_state advance_phead(struct bbq *q, uint64_t ph) { // 其他线程完成了head更新,当前bbq_fetch_max不会再更新,可能在以下两种情况: // 1)实际ph_vsn与本次要更新的ph_vsn相同。 // 2)当前ph_vsn已经落后于实际的ph_vsn(且移动到了下一轮), - bbq_fetch_max(&n_blk->committed, new_vsn); - bbq_fetch_max(&n_blk->allocated, new_vsn); + bbq_fetch_max(&n_blk->committed, new_vsn, prod_single); + bbq_fetch_max(&n_blk->allocated, new_vsn, prod_single); // ph+1,当超过索引范围,进入下一轮时,version会自动+1 - bbq_fetch_max(&q->phead.value, ph + 1); + bbq_fetch_max(&q->phead.value, ph + 1, prod_single); return BBQ_SUCCESS; } bool bbq_empty(struct bbq *q) { - return bbq_atomic64_load(&q->phead.count) == bbq_atomic64_load(&q->chead.count); + return bbq_atomic64_load(&q->phead.count, q->prod_single) == bbq_atomic64_load(&q->chead.count, q->cons_single); } static uint32_t bbq_wait_consumed_get(struct bbq *q, uint64_t enq_update, uint64_t deq_update) { - uint64_t enq_now = enq_update == 0 ? bbq_atomic64_load(&q->phead.count) : enq_update; - uint64_t deq_now = deq_update == 0 ? bbq_atomic64_load(&q->chead.count) : deq_update; + uint64_t enq_now = enq_update == 0 ? bbq_atomic64_load(&q->phead.count, q->prod_single) : enq_update; + uint64_t deq_now = deq_update == 0 ? bbq_atomic64_load(&q->chead.count, q->cons_single) : deq_update; + return enq_now - deq_now; } @@ -576,6 +570,7 @@ static uint32_t bbq_wait_consumed_get(struct bbq *q, uint64_t enq_update, uint64 static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t n, uint32_t data_type, uint32_t *wait_consumed) { uint64_t enq_update = 0; struct bbq_status ret = {.status = 0, .actual_burst = 0}; + bool prod_single = q->prod_single; if (q == NULL || data == NULL) { bbq_errno = BBQ_ERR_INPUT_NULL; @@ -584,7 +579,7 @@ static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t } while (true) { - uint64_t ph = bbq_atomic64_load(&q->phead.value); + uint64_t ph = bbq_atomic64_load(&q->phead.value, prod_single); struct bbq_queue_state_s state = bbq_allocate_entry(q, ph, n); switch (state.state) { @@ -594,7 +589,7 @@ static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t ret.status = BBQ_OK; if (BBQ_F_CHK_STAT_ENABLE(q->flags)) { - enq_update = bbq_atomic64_fetch_add(&q->phead.count, state.e.actual_burst) + state.e.actual_burst; + enq_update = bbq_atomic64_fetch_add(&q->phead.count, state.e.actual_burst, prod_single) + state.e.actual_burst; } break; case BBQ_BLOCK_DONE: { @@ -643,15 +638,15 @@ int bbq_enqueue_elem(struct bbq *q, void const *data) { } /* 更新成功 reserve成功的个数 */ -uint32_t bbq_reserve_update(struct bbq_atomic64 *atomic, uint64_t reserved, uint32_t n) { - if (atomic->single) { +uint32_t bbq_reserve_update(struct bbq_atomic64 *atomic, uint64_t reserved, uint32_t n, bool single) { + if (single) { atomic->s += n; return n; } else { // TODO:合并逻辑? if (n == 1) { // fetch_max返回的是旧值,如果旧值等于局部变量reserved,则代表数据由当前线程更新 - if (bbq_fetch_max(atomic, reserved + 1) == reserved) { + if (bbq_fetch_max(atomic, reserved + 1, single) == reserved) { return 1; } @@ -666,14 +661,17 @@ uint32_t bbq_reserve_update(struct bbq_atomic64 *atomic, uint64_t reserved, uint struct bbq_queue_state_s bbq_reserve_entry(struct bbq *q, struct bbq_block *block, uint32_t n) { uint32_t cont = 0; + bool prod_single = q->prod_single; + bool cons_single = q->cons_single; + while (true) { struct bbq_queue_state_s state; - uint64_t reserved = bbq_atomic64_load(&block->reserved); + uint64_t reserved = bbq_atomic64_load(&block->reserved, cons_single); uint64_t reserved_off = bbq_cur_off(q, reserved); uint64_t reserved_svn = bbq_cur_vsn(q, reserved); if (reserved_off < q->bs) { - uint64_t committed = bbq_atomic64_load(&block->committed); + uint64_t committed = bbq_atomic64_load(&block->committed, prod_single); uint64_t committed_off = bbq_cur_off(q, committed); if (committed_off == reserved_off) { state.state = BBQ_NO_ENTRY; @@ -682,7 +680,7 @@ struct bbq_queue_state_s bbq_reserve_entry(struct bbq *q, struct bbq_block *bloc // 当前块的数据没有被全部commited,需要通过判断allocated和committed来判断是否存在正在入队进行中的数据 if (committed_off != q->bs) { - uint64_t allocated = bbq_atomic64_load(&block->allocated); + uint64_t allocated = bbq_atomic64_load(&block->allocated, prod_single); if (bbq_cur_off(q, allocated) != committed_off) { state.state = BBQ_NOT_AVAILABLE; return state; @@ -690,7 +688,7 @@ struct bbq_queue_state_s bbq_reserve_entry(struct bbq *q, struct bbq_block *bloc } uint32_t tmp = committed_off - reserved_off; - uint32_t reserved_cnt = bbq_reserve_update(&block->reserved, reserved, tmp < n ? tmp : n); + uint32_t reserved_cnt = bbq_reserve_update(&block->reserved, reserved, tmp < n ? tmp : n, q->cons_single); if (reserved_cnt > 0) { state.state = BBQ_RESERVED; state.e.actual_burst = reserved_cnt; @@ -714,6 +712,8 @@ struct bbq_queue_state_s bbq_reserve_entry(struct bbq *q, struct bbq_block *bloc bool consume_entry(struct bbq *q, struct bbq_entry_desc *e, void *deq_data, uint32_t data_type) { size_t idx = e->off * q->entry_size; + bool prod_single = q->prod_single; + bool cons_single = q->cons_single; if (BBQ_F_CHK_COPY_VALUE(q->flags)) { switch (data_type) { @@ -749,13 +749,13 @@ bool consume_entry(struct bbq *q, struct bbq_entry_desc *e, void *deq_data, uint uint64_t allocated; if (BBQ_F_CHK_DROP_OLD(q->flags)) { // TODO:优化,考虑allocated vsn溢出?考虑判断如果生产满了,直接移动head? - allocated = bbq_atomic64_load(&e->block->allocated); + allocated = bbq_atomic64_load(&e->block->allocated, prod_single); // 预留的entry所在的块,已经被新生产的数据赶上了 if (bbq_cur_vsn(q, allocated) != e->vsn) { return false; } } else { - bbq_atomic64_fetch_add(&e->block->consumed, e->actual_burst); + bbq_atomic64_fetch_add(&e->block->consumed, e->actual_burst, cons_single); } return true; @@ -764,33 +764,38 @@ bool consume_entry(struct bbq *q, struct bbq_entry_desc *e, void *deq_data, uint bool advance_chead(struct bbq *q, uint64_t ch, uint64_t ver) { uint64_t ch_idx = bbq_head_idx(q, ch); struct bbq_block *n_blk = &(q->blocks[(ch_idx + 1) & q->idx_mask]); - + bool prod_single = q->prod_single; + bool cons_single = q->cons_single; uint64_t ch_vsn = bbq_head_vsn(q, ch); - uint64_t committed = bbq_atomic64_load(&n_blk->committed); + uint64_t committed = bbq_atomic64_load(&n_blk->committed, prod_single); uint64_t committed_vsn = bbq_cur_vsn(q, committed); + if (BBQ_F_CHK_DROP_OLD(q->flags)) { // 通过检查下一个块的版本是否大于或等于当前块来保证 FIFO 顺序. // 第一个块是一个特殊情况,因为与其他块相比,它的版本总是相差一个。因此,如果 ch_idx == 0,我们在比较中加 1 - if (committed_vsn < ver + (ch_idx == 0)) + if (committed_vsn < ver + (ch_idx == 0)) { return false; - bbq_fetch_max(&n_blk->reserved, bbq_set_cur_vsn(q, committed_vsn)); + } + + bbq_fetch_max(&n_blk->reserved, bbq_set_cur_vsn(q, committed_vsn), cons_single); } else { if (committed_vsn != ch_vsn + 1) { // 消费者追上了生产者,下一块还未开始生产 return false; } uint64_t new_vsn = bbq_set_cur_vsn(q, ch_vsn + 1); - bbq_fetch_max(&n_blk->consumed, new_vsn); - bbq_fetch_max(&n_blk->reserved, new_vsn); + bbq_fetch_max(&n_blk->consumed, new_vsn, cons_single); + bbq_fetch_max(&n_blk->reserved, new_vsn, cons_single); } - bbq_fetch_max(&q->chead.value, ch + 1); + bbq_fetch_max(&q->chead.value, ch + 1, cons_single); return true; } /* 消息队列出队 */ static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n, uint32_t data_type, uint32_t *wait_consumed) { uint64_t deq_update = 0; + bool cons_single = q->cons_single; struct bbq_status ret = {.status = 0, .actual_burst = 0}; if (q == NULL || deq_data == NULL) { bbq_errno = BBQ_ERR_INPUT_NULL; @@ -799,7 +804,7 @@ static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n } while (true) { - uint64_t ch = bbq_atomic64_load(&q->chead.value); + uint64_t ch = bbq_atomic64_load(&q->chead.value, cons_single); struct bbq_block *blk = &(q->blocks[bbq_head_idx(q, ch)]); struct bbq_queue_state_s state; state = bbq_reserve_entry(q, blk, n); @@ -813,7 +818,8 @@ static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n ret.actual_burst = state.e.actual_burst; if (BBQ_F_CHK_STAT_ENABLE(q->flags)) { - deq_update = bbq_atomic64_fetch_add(&q->chead.count, state.e.actual_burst) + state.e.actual_burst; + deq_update = bbq_atomic64_fetch_add(&q->chead.count, state.e.actual_burst, cons_single) + + state.e.actual_burst; } break; case BBQ_NO_ENTRY: @@ -1049,10 +1055,13 @@ void bbq_debug_memory_print() { } void bbq_debug_block_print(struct bbq *q, struct bbq_block *block) { - uint64_t allocated = bbq_atomic64_load(&block->allocated); - uint64_t committed = bbq_atomic64_load(&block->committed); - uint64_t reserved = bbq_atomic64_load(&block->reserved); - uint64_t consumed = bbq_atomic64_load(&block->consumed); + bool prod_single = q->prod_single; + bool cons_single = q->cons_single; + + uint64_t allocated = bbq_atomic64_load(&block->allocated, prod_single); + uint64_t committed = bbq_atomic64_load(&block->committed, prod_single); + uint64_t reserved = bbq_atomic64_load(&block->reserved, cons_single); + uint64_t consumed = bbq_atomic64_load(&block->consumed, cons_single); printf(" allocated:%lu(ver:%lu) committed:%lu(ver:%lu) reserved:%lu(ver:%lu)", bbq_cur_off(q, allocated), bbq_cur_vsn(q, allocated), bbq_cur_off(q, committed), bbq_cur_vsn(q, committed), @@ -1066,8 +1075,8 @@ void bbq_debug_block_print(struct bbq *q, struct bbq_block *block) { void bbq_debug_struct_print(struct bbq *q) { printf("-----bbq:%s-----\n", BBQ_F_CHK_DROP_OLD(q->flags) ? "drop old" : "retry new"); - uint64_t phead = bbq_atomic64_load(&q->phead.value); - uint64_t chead = bbq_atomic64_load(&q->chead.value); + uint64_t phead = bbq_atomic64_load(&q->phead.value, q->prod_single); + uint64_t chead = bbq_atomic64_load(&q->chead.value, q->cons_single); printf("block number:%u block size:%u total entries:%u\n", q->bn, q->bs, q->bn * q->bs); printf("producer header idx:%lu vsn:%lu\n", bbq_head_idx(q, phead), bbq_head_vsn(q, phead)); |
