summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorliuyu <[email protected]>2024-07-02 07:58:09 -0400
committerliuyu <[email protected]>2024-07-02 07:58:09 -0400
commit88f8c871bf527082df5e322d6778f1402de7f0fd (patch)
treea3f14cd704d0f0ee6da0836f821f5db84656bb79
parent462c024c8abb494efa0491af1a5cff559be06453 (diff)
临时提交,bbq_atomic64里的single标识移出来
-rw-r--r--bbq/include/bbq.h15
-rw-r--r--bbq/src/bbq.c135
2 files changed, 79 insertions, 71 deletions
diff --git a/bbq/include/bbq.h b/bbq/include/bbq.h
index ec102e5..2d1561f 100644
--- a/bbq/include/bbq.h
+++ b/bbq/include/bbq.h
@@ -1,6 +1,6 @@
/*
* @Author: [email protected]
- * @LastEditTime: 2024-07-02 11:12:50
+ * @LastEditTime: 2024-07-02 07:57:19
* @Describe: bbq(Block-based Bounded Queue)头文件
* 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf
*/
@@ -26,7 +26,6 @@ using aotmic_uint64 = std::atomic<uint64_t>;
#define __BBQ_CACHE_ALIGNED __attribute__((__aligned__(BBQ_CACHE_LINE)))
struct bbq_atomic64 {
- bool single; // 如果为单生产者或单消费者,则single为true
union {
volatile uint64_t s; // single使用该字段
aotmic_uint64 m;
@@ -39,10 +38,10 @@ struct bbq_head {
} __BBQ_CACHE_ALIGNED;
struct bbq_block {
- struct bbq_atomic64 committed; // 已提交(version|offset)
- struct bbq_atomic64 allocated; // 已分配(version|offset)
- struct bbq_atomic64 reserved; // 已预留(version|offset)
- struct bbq_atomic64 consumed; // 已消费(version|offset)注:在drop-old模式下没用到
+ struct bbq_atomic64 committed; // 生产者,已提交(version|offset)
+ struct bbq_atomic64 allocated; // 生产者,已分配(version|offset)
+ struct bbq_atomic64 reserved; // 消费者,已预留(version|offset)
+ struct bbq_atomic64 consumed; // 消费者,已消费(version|offset)注:在drop-old模式下没用到
char *entries __BBQ_CACHE_ALIGNED; // 存储大小可变的entry,每个块分配空间:bs * entry_size
} __BBQ_CACHE_ALIGNED;
@@ -69,8 +68,8 @@ struct bbq {
uint64_t idx_mask; // idx_bits偏移后的掩码
uint64_t off_mask; // off_bits偏移后的掩码
uint64_t entry_size; // blocks.entries里每个entry的大小
- uint64_t pad0;
- uint64_t pad1;
+ bool prod_single; // 如果为单生产者或单消费者,则single为true
+ bool cons_single; // 如果为单生产者或单消费者,则single为true
struct bbq_head phead; // 生产者头,指向块的索引,分为两部分:version|idx
struct bbq_head chead; // 消费者头,指向块的索引,分为两部分:version|idx
diff --git a/bbq/src/bbq.c b/bbq/src/bbq.c
index 3c14bb8..2b72265 100644
--- a/bbq/src/bbq.c
+++ b/bbq/src/bbq.c
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-07-02 07:15:00
+ * @LastEditTime: 2024-07-02 11:47:55
* @Describe: bbq(Block-based Bounded Queue)实现
* 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf
@@ -81,23 +81,23 @@ static inline uint64_t bbq_set_cur_vsn(struct bbq *q, uint64_t ver) {
return ver << q->off_bits;
}
-uint64_t bbq_atomic64_load(struct bbq_atomic64 *atomic) {
- if (atomic->single) {
+uint64_t bbq_atomic64_load(struct bbq_atomic64 *atomic, bool single) {
+ if (single) {
return atomic->s;
} else {
return atomic_load(&atomic->m);
}
}
-void bbq_atomic64_store(struct bbq_atomic64 *atomic, uint64_t value) {
- if (atomic->single) {
+void bbq_atomic64_store(struct bbq_atomic64 *atomic, uint64_t value, bool single) {
+ if (single) {
atomic->s = value;
} else {
atomic_store(&atomic->m, value);
}
}
-static inline uint64_t bbq_atomic64_fetch_add(struct bbq_atomic64 *atomic, uint64_t value) {
- if (atomic->single) {
+static inline uint64_t bbq_atomic64_fetch_add(struct bbq_atomic64 *atomic, uint64_t value, bool single) {
+ if (single) {
uint64_t old = atomic->s;
atomic->s += value;
return old;
@@ -139,10 +139,10 @@ void bbq_free_def_callback(void *ptr,
}
/* 原子的比较两个值大小,并设置较大的值,成功则返回设置前的旧值 */
-uint64_t bbq_fetch_max(struct bbq_atomic64 *atomic, uint64_t upd) {
+uint64_t bbq_fetch_max(struct bbq_atomic64 *atomic, uint64_t upd, bool single) {
uint64_t old_value = 0;
- if (atomic->single) {
+ if (single) {
old_value = atomic->s;
atomic->s = upd;
} else {
@@ -228,23 +228,13 @@ int block_init(struct bbq *q, struct bbq_block *block, bool cursor_init, uint32_
return bbq_errno;
}
- if (BBQ_F_CHK_SP_ENQ(flags)) {
- block->allocated.single = true;
- block->committed.single = true;
- }
-
- if (BBQ_F_CHK_SC_DEQ(flags)) {
- block->reserved.single = true;
- block->consumed.single = true;
- }
-
if (cursor_init) {
// block数组里,除了第一块之外需要设置
- bbq_atomic64_store(&block->committed, q->bs);
- bbq_atomic64_store(&block->allocated, q->bs);
- bbq_atomic64_store(&block->reserved, q->bs);
+ bbq_atomic64_store(&block->committed, q->bs, q->prod_single);
+ bbq_atomic64_store(&block->allocated, q->bs, q->prod_single);
+ bbq_atomic64_store(&block->reserved, q->bs, q->cons_single);
if (!BBQ_F_CHK_DROP_OLD(q->flags)) {
- bbq_atomic64_store(&block->consumed, q->bs);
+ bbq_atomic64_store(&block->consumed, q->bs, q->cons_single);
}
}
@@ -348,10 +338,10 @@ static struct bbq *__bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs,
q->entry_size = obj_size;
q->socket_id = socket_id;
if (BBQ_F_CHK_SP_ENQ(flags)) {
- q->phead.value.single = true;
+ q->prod_single = true;
}
if (BBQ_F_CHK_SC_DEQ(flags)) {
- q->chead.value.single = true;
+ q->cons_single = true;
}
q->flags = flags;
@@ -481,20 +471,21 @@ void bbq_commit_entry(struct bbq *q, struct bbq_entry_desc *e, void const *data,
break;
}
}
- bbq_atomic64_fetch_add(&e->block->committed, e->actual_burst);
+ bbq_atomic64_fetch_add(&e->block->committed, e->actual_burst, q->prod_single);
}
struct bbq_queue_state_s bbq_allocate_entry(struct bbq *q, uint64_t ph, uint32_t n) {
struct bbq_queue_state_s state = {0};
uint64_t ph_idx = bbq_head_idx(q, ph);
+ bool prod_single = q->prod_single;
struct bbq_block *block = &(q->blocks[ph_idx]);
- if (bbq_cur_off(q, bbq_atomic64_load(&block->allocated)) >= q->bs) {
+ if (bbq_cur_off(q, bbq_atomic64_load(&block->allocated, prod_single)) >= q->bs) {
state.state = BBQ_BLOCK_DONE;
return state;
}
- uint64_t old = bbq_atomic64_fetch_add(&block->allocated, n);
+ uint64_t old = bbq_atomic64_fetch_add(&block->allocated, n, prod_single);
uint64_t cur_vsn = bbq_cur_vsn(q, old);
uint64_t cur_off = bbq_cur_off(q, old);
@@ -523,15 +514,17 @@ enum bbq_queue_state advance_phead(struct bbq *q, uint64_t ph) {
// 获取下一个block
struct bbq_block *n_blk = &(q->blocks[(bbq_head_idx(q, ph) + 1) & q->idx_mask]);
uint64_t ph_vsn = bbq_head_vsn(q, ph);
+ bool prod_single = q->prod_single;
+ bool cons_single = q->cons_single;
if (BBQ_F_CHK_DROP_OLD(q->flags)) {
- cur = bbq_atomic64_load(&n_blk->committed);
+ cur = bbq_atomic64_load(&n_blk->committed, prod_single);
// 生产者head避免覆盖上一轮尚未完全提交的区块
if (bbq_cur_vsn(q, cur) == ph_vsn && bbq_cur_off(q, cur) != q->bs) {
return BBQ_NOT_AVAILABLE;
}
} else {
- cur = bbq_atomic64_load(&n_blk->consumed);
+ cur = bbq_atomic64_load(&n_blk->consumed, cons_single);
uint64_t reserved;
uint64_t consumed_off = bbq_cur_off(q, cur);
uint64_t consumed_vsn = bbq_cur_vsn(q, cur);
@@ -539,7 +532,7 @@ enum bbq_queue_state advance_phead(struct bbq *q, uint64_t ph) {
if (consumed_vsn < ph_vsn ||
(consumed_vsn == ph_vsn && consumed_off != q->bs)) {
// 生产者赶上了消费者
- reserved = bbq_atomic64_load(&n_blk->reserved);
+ reserved = bbq_atomic64_load(&n_blk->reserved, cons_single);
if (bbq_cur_off(q, reserved) == consumed_off) {
return BBQ_NO_ENTRY;
} else {
@@ -553,21 +546,22 @@ enum bbq_queue_state advance_phead(struct bbq *q, uint64_t ph) {
// 其他线程完成了head更新,当前bbq_fetch_max不会再更新,可能在以下两种情况:
// 1)实际ph_vsn与本次要更新的ph_vsn相同。
// 2)当前ph_vsn已经落后于实际的ph_vsn(且移动到了下一轮),
- bbq_fetch_max(&n_blk->committed, new_vsn);
- bbq_fetch_max(&n_blk->allocated, new_vsn);
+ bbq_fetch_max(&n_blk->committed, new_vsn, prod_single);
+ bbq_fetch_max(&n_blk->allocated, new_vsn, prod_single);
// ph+1,当超过索引范围,进入下一轮时,version会自动+1
- bbq_fetch_max(&q->phead.value, ph + 1);
+ bbq_fetch_max(&q->phead.value, ph + 1, prod_single);
return BBQ_SUCCESS;
}
bool bbq_empty(struct bbq *q) {
- return bbq_atomic64_load(&q->phead.count) == bbq_atomic64_load(&q->chead.count);
+ return bbq_atomic64_load(&q->phead.count, q->prod_single) == bbq_atomic64_load(&q->chead.count, q->cons_single);
}
static uint32_t bbq_wait_consumed_get(struct bbq *q, uint64_t enq_update, uint64_t deq_update) {
- uint64_t enq_now = enq_update == 0 ? bbq_atomic64_load(&q->phead.count) : enq_update;
- uint64_t deq_now = deq_update == 0 ? bbq_atomic64_load(&q->chead.count) : deq_update;
+ uint64_t enq_now = enq_update == 0 ? bbq_atomic64_load(&q->phead.count, q->prod_single) : enq_update;
+ uint64_t deq_now = deq_update == 0 ? bbq_atomic64_load(&q->chead.count, q->cons_single) : deq_update;
+
return enq_now - deq_now;
}
@@ -576,6 +570,7 @@ static uint32_t bbq_wait_consumed_get(struct bbq *q, uint64_t enq_update, uint64
static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t n, uint32_t data_type, uint32_t *wait_consumed) {
uint64_t enq_update = 0;
struct bbq_status ret = {.status = 0, .actual_burst = 0};
+ bool prod_single = q->prod_single;
if (q == NULL || data == NULL) {
bbq_errno = BBQ_ERR_INPUT_NULL;
@@ -584,7 +579,7 @@ static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t
}
while (true) {
- uint64_t ph = bbq_atomic64_load(&q->phead.value);
+ uint64_t ph = bbq_atomic64_load(&q->phead.value, prod_single);
struct bbq_queue_state_s state = bbq_allocate_entry(q, ph, n);
switch (state.state) {
@@ -594,7 +589,7 @@ static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t
ret.status = BBQ_OK;
if (BBQ_F_CHK_STAT_ENABLE(q->flags)) {
- enq_update = bbq_atomic64_fetch_add(&q->phead.count, state.e.actual_burst) + state.e.actual_burst;
+ enq_update = bbq_atomic64_fetch_add(&q->phead.count, state.e.actual_burst, prod_single) + state.e.actual_burst;
}
break;
case BBQ_BLOCK_DONE: {
@@ -643,15 +638,15 @@ int bbq_enqueue_elem(struct bbq *q, void const *data) {
}
/* 更新成功 reserve成功的个数 */
-uint32_t bbq_reserve_update(struct bbq_atomic64 *atomic, uint64_t reserved, uint32_t n) {
- if (atomic->single) {
+uint32_t bbq_reserve_update(struct bbq_atomic64 *atomic, uint64_t reserved, uint32_t n, bool single) {
+ if (single) {
atomic->s += n;
return n;
} else {
// TODO:合并逻辑?
if (n == 1) {
// fetch_max返回的是旧值,如果旧值等于局部变量reserved,则代表数据由当前线程更新
- if (bbq_fetch_max(atomic, reserved + 1) == reserved) {
+ if (bbq_fetch_max(atomic, reserved + 1, single) == reserved) {
return 1;
}
@@ -666,14 +661,17 @@ uint32_t bbq_reserve_update(struct bbq_atomic64 *atomic, uint64_t reserved, uint
struct bbq_queue_state_s bbq_reserve_entry(struct bbq *q, struct bbq_block *block, uint32_t n) {
uint32_t cont = 0;
+ bool prod_single = q->prod_single;
+ bool cons_single = q->cons_single;
+
while (true) {
struct bbq_queue_state_s state;
- uint64_t reserved = bbq_atomic64_load(&block->reserved);
+ uint64_t reserved = bbq_atomic64_load(&block->reserved, cons_single);
uint64_t reserved_off = bbq_cur_off(q, reserved);
uint64_t reserved_svn = bbq_cur_vsn(q, reserved);
if (reserved_off < q->bs) {
- uint64_t committed = bbq_atomic64_load(&block->committed);
+ uint64_t committed = bbq_atomic64_load(&block->committed, prod_single);
uint64_t committed_off = bbq_cur_off(q, committed);
if (committed_off == reserved_off) {
state.state = BBQ_NO_ENTRY;
@@ -682,7 +680,7 @@ struct bbq_queue_state_s bbq_reserve_entry(struct bbq *q, struct bbq_block *bloc
// 当前块的数据没有被全部commited,需要通过判断allocated和committed来判断是否存在正在入队进行中的数据
if (committed_off != q->bs) {
- uint64_t allocated = bbq_atomic64_load(&block->allocated);
+ uint64_t allocated = bbq_atomic64_load(&block->allocated, prod_single);
if (bbq_cur_off(q, allocated) != committed_off) {
state.state = BBQ_NOT_AVAILABLE;
return state;
@@ -690,7 +688,7 @@ struct bbq_queue_state_s bbq_reserve_entry(struct bbq *q, struct bbq_block *bloc
}
uint32_t tmp = committed_off - reserved_off;
- uint32_t reserved_cnt = bbq_reserve_update(&block->reserved, reserved, tmp < n ? tmp : n);
+ uint32_t reserved_cnt = bbq_reserve_update(&block->reserved, reserved, tmp < n ? tmp : n, q->cons_single);
if (reserved_cnt > 0) {
state.state = BBQ_RESERVED;
state.e.actual_burst = reserved_cnt;
@@ -714,6 +712,8 @@ struct bbq_queue_state_s bbq_reserve_entry(struct bbq *q, struct bbq_block *bloc
bool consume_entry(struct bbq *q, struct bbq_entry_desc *e, void *deq_data, uint32_t data_type) {
size_t idx = e->off * q->entry_size;
+ bool prod_single = q->prod_single;
+ bool cons_single = q->cons_single;
if (BBQ_F_CHK_COPY_VALUE(q->flags)) {
switch (data_type) {
@@ -749,13 +749,13 @@ bool consume_entry(struct bbq *q, struct bbq_entry_desc *e, void *deq_data, uint
uint64_t allocated;
if (BBQ_F_CHK_DROP_OLD(q->flags)) {
// TODO:优化,考虑allocated vsn溢出?考虑判断如果生产满了,直接移动head?
- allocated = bbq_atomic64_load(&e->block->allocated);
+ allocated = bbq_atomic64_load(&e->block->allocated, prod_single);
// 预留的entry所在的块,已经被新生产的数据赶上了
if (bbq_cur_vsn(q, allocated) != e->vsn) {
return false;
}
} else {
- bbq_atomic64_fetch_add(&e->block->consumed, e->actual_burst);
+ bbq_atomic64_fetch_add(&e->block->consumed, e->actual_burst, cons_single);
}
return true;
@@ -764,33 +764,38 @@ bool consume_entry(struct bbq *q, struct bbq_entry_desc *e, void *deq_data, uint
bool advance_chead(struct bbq *q, uint64_t ch, uint64_t ver) {
uint64_t ch_idx = bbq_head_idx(q, ch);
struct bbq_block *n_blk = &(q->blocks[(ch_idx + 1) & q->idx_mask]);
-
+ bool prod_single = q->prod_single;
+ bool cons_single = q->cons_single;
uint64_t ch_vsn = bbq_head_vsn(q, ch);
- uint64_t committed = bbq_atomic64_load(&n_blk->committed);
+ uint64_t committed = bbq_atomic64_load(&n_blk->committed, prod_single);
uint64_t committed_vsn = bbq_cur_vsn(q, committed);
+
if (BBQ_F_CHK_DROP_OLD(q->flags)) {
// 通过检查下一个块的版本是否大于或等于当前块来保证 FIFO 顺序.
// 第一个块是一个特殊情况,因为与其他块相比,它的版本总是相差一个。因此,如果 ch_idx == 0,我们在比较中加 1
- if (committed_vsn < ver + (ch_idx == 0))
+ if (committed_vsn < ver + (ch_idx == 0)) {
return false;
- bbq_fetch_max(&n_blk->reserved, bbq_set_cur_vsn(q, committed_vsn));
+ }
+
+ bbq_fetch_max(&n_blk->reserved, bbq_set_cur_vsn(q, committed_vsn), cons_single);
} else {
if (committed_vsn != ch_vsn + 1) {
// 消费者追上了生产者,下一块还未开始生产
return false;
}
uint64_t new_vsn = bbq_set_cur_vsn(q, ch_vsn + 1);
- bbq_fetch_max(&n_blk->consumed, new_vsn);
- bbq_fetch_max(&n_blk->reserved, new_vsn);
+ bbq_fetch_max(&n_blk->consumed, new_vsn, cons_single);
+ bbq_fetch_max(&n_blk->reserved, new_vsn, cons_single);
}
- bbq_fetch_max(&q->chead.value, ch + 1);
+ bbq_fetch_max(&q->chead.value, ch + 1, cons_single);
return true;
}
/* 消息队列出队 */
static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n, uint32_t data_type, uint32_t *wait_consumed) {
uint64_t deq_update = 0;
+ bool cons_single = q->cons_single;
struct bbq_status ret = {.status = 0, .actual_burst = 0};
if (q == NULL || deq_data == NULL) {
bbq_errno = BBQ_ERR_INPUT_NULL;
@@ -799,7 +804,7 @@ static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n
}
while (true) {
- uint64_t ch = bbq_atomic64_load(&q->chead.value);
+ uint64_t ch = bbq_atomic64_load(&q->chead.value, cons_single);
struct bbq_block *blk = &(q->blocks[bbq_head_idx(q, ch)]);
struct bbq_queue_state_s state;
state = bbq_reserve_entry(q, blk, n);
@@ -813,7 +818,8 @@ static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n
ret.actual_burst = state.e.actual_burst;
if (BBQ_F_CHK_STAT_ENABLE(q->flags)) {
- deq_update = bbq_atomic64_fetch_add(&q->chead.count, state.e.actual_burst) + state.e.actual_burst;
+ deq_update = bbq_atomic64_fetch_add(&q->chead.count, state.e.actual_burst, cons_single) +
+ state.e.actual_burst;
}
break;
case BBQ_NO_ENTRY:
@@ -1049,10 +1055,13 @@ void bbq_debug_memory_print() {
}
void bbq_debug_block_print(struct bbq *q, struct bbq_block *block) {
- uint64_t allocated = bbq_atomic64_load(&block->allocated);
- uint64_t committed = bbq_atomic64_load(&block->committed);
- uint64_t reserved = bbq_atomic64_load(&block->reserved);
- uint64_t consumed = bbq_atomic64_load(&block->consumed);
+ bool prod_single = q->prod_single;
+ bool cons_single = q->cons_single;
+
+ uint64_t allocated = bbq_atomic64_load(&block->allocated, prod_single);
+ uint64_t committed = bbq_atomic64_load(&block->committed, prod_single);
+ uint64_t reserved = bbq_atomic64_load(&block->reserved, cons_single);
+ uint64_t consumed = bbq_atomic64_load(&block->consumed, cons_single);
printf(" allocated:%lu(ver:%lu) committed:%lu(ver:%lu) reserved:%lu(ver:%lu)",
bbq_cur_off(q, allocated), bbq_cur_vsn(q, allocated),
bbq_cur_off(q, committed), bbq_cur_vsn(q, committed),
@@ -1066,8 +1075,8 @@ void bbq_debug_block_print(struct bbq *q, struct bbq_block *block) {
void bbq_debug_struct_print(struct bbq *q) {
printf("-----bbq:%s-----\n", BBQ_F_CHK_DROP_OLD(q->flags) ? "drop old" : "retry new");
- uint64_t phead = bbq_atomic64_load(&q->phead.value);
- uint64_t chead = bbq_atomic64_load(&q->chead.value);
+ uint64_t phead = bbq_atomic64_load(&q->phead.value, q->prod_single);
+ uint64_t chead = bbq_atomic64_load(&q->chead.value, q->cons_single);
printf("block number:%u block size:%u total entries:%u\n", q->bn, q->bs, q->bn * q->bs);
printf("producer header idx:%lu vsn:%lu\n", bbq_head_idx(q, phead), bbq_head_vsn(q, phead));