summaryrefslogtreecommitdiff
path: root/bbq/src
diff options
context:
space:
mode:
Diffstat (limited to 'bbq/src')
-rw-r--r--bbq/src/bbq.c42
1 files changed, 21 insertions, 21 deletions
diff --git a/bbq/src/bbq.c b/bbq/src/bbq.c
index 90bcdbe..b56c6cc 100644
--- a/bbq/src/bbq.c
+++ b/bbq/src/bbq.c
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-07-01 05:20:02
+ * @LastEditTime: 2024-07-01 23:13:37
* @Describe: bbq(Block-based Bounded Queue)实现
* 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf
@@ -124,7 +124,7 @@ void *bbq_malloc_def_callback(int32_t socket_id __attribute__((unused)), size_t
bbq_memory_g.malloc_cnt++;
bbq_memory_g.malloc_size += size;
#endif
- return malloc(size);
+ return aligned_alloc(BBQ_CACHE_LINE, size);
}
void bbq_free_def_callback(void *ptr,
@@ -361,10 +361,10 @@ static struct bbq *__bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs,
q->entry_size = obj_size;
q->socket_id = socket_id;
if (BBQ_F_CHK_SP_ENQ(flags)) {
- q->phead.single = true;
+ q->phead.value.single = true;
}
if (BBQ_F_CHK_SC_DEQ(flags)) {
- q->chead.single = true;
+ q->chead.value.single = true;
}
q->flags = flags;
q->malloc_f = malloc_f;
@@ -607,13 +607,13 @@ enum bbq_queue_state advance_phead(struct bbq *q, uint64_t ph) {
bbq_fetch_max(&n_blk->allocated, new_vsn);
// ph+1,当超过索引范围,进入下一轮时,version会自动+1
- bbq_fetch_max(&q->phead, ph + 1);
+ bbq_fetch_max(&q->phead.value, ph + 1);
return BBQ_SUCCESS;
}
-static uint32_t bbq_wait_consumed_get_by_stat(struct bbq *q, uint64_t enq_update, uint64_t deq_update) {
- uint64_t enq_now = enq_update == 0 ? bbq_atomic64_load(&q->stat.n_enq) : enq_update;
- uint64_t deq_now = deq_update == 0 ? bbq_atomic64_load(&q->stat.n_deq) : deq_update;
+static uint32_t bbq_wait_consumed_get(struct bbq *q, uint64_t enq_update, uint64_t deq_update) {
+ uint64_t enq_now = enq_update == 0 ? bbq_atomic64_load(&q->phead.count) : enq_update;
+ uint64_t deq_now = deq_update == 0 ? bbq_atomic64_load(&q->chead.count) : deq_update;
return enq_now - deq_now;
}
@@ -627,13 +627,13 @@ __attribute__((unused)) static uint32_t bbq_wait_consumed_get_by_head(struct bbq
if (ch_ptr != NULL) {
ch = *ch_ptr;
} else {
- ch = bbq_atomic64_load(&q->chead);
+ ch = bbq_atomic64_load(&q->chead.value);
}
if (ph_ptr != NULL) {
ph = *ph_ptr;
} else {
- ph = bbq_atomic64_load(&q->phead);
+ ph = bbq_atomic64_load(&q->phead.value);
}
uint64_t ph_idx = bbq_head_idx(q, ph);
@@ -679,7 +679,7 @@ static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t
}
while (true) {
- uint64_t ph = bbq_atomic64_load(&q->phead);
+ uint64_t ph = bbq_atomic64_load(&q->phead.value);
struct bbq_queue_state_s state = bbq_allocate_entry(q, ph, n);
switch (state.state) {
@@ -689,7 +689,7 @@ static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t
ret.status = BBQ_OK;
if (BBQ_F_CHK_STAT_ENABLE(q->flags)) {
- enq_update = bbq_atomic64_fetch_add(&q->stat.n_enq, state.e.actual_burst) + state.e.actual_burst;
+ enq_update = bbq_atomic64_fetch_add(&q->phead.count, state.e.actual_burst) + state.e.actual_burst;
}
break;
case BBQ_BLOCK_DONE: {
@@ -718,7 +718,7 @@ static struct bbq_status __bbq_enqueue(struct bbq *q, void const *data, uint32_t
}
if (BBQ_F_CHK_STAT_ENABLE(q->flags) && wait_consumed != NULL) {
- *wait_consumed = bbq_wait_consumed_get_by_stat(q, enq_update, 0);
+ *wait_consumed = bbq_wait_consumed_get(q, enq_update, 0);
}
return ret;
@@ -891,7 +891,7 @@ bool advance_chead(struct bbq *q, uint64_t ch, uint64_t ver) {
bbq_fetch_max(&n_blk->reserved, new_vsn);
}
- bbq_fetch_max(&q->chead, ch + 1);
+ bbq_fetch_max(&q->chead.value, ch + 1);
return true;
}
@@ -906,7 +906,7 @@ static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n
}
while (true) {
- uint64_t ch = bbq_atomic64_load(&q->chead);
+ uint64_t ch = bbq_atomic64_load(&q->chead.value);
struct bbq_block *blk = &(q->blocks[bbq_head_idx(q, ch)]);
struct bbq_queue_state_s state;
state = bbq_reserve_entry(q, blk, n);
@@ -920,7 +920,7 @@ static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n
ret.actual_burst = state.e.actual_burst;
if (BBQ_F_CHK_STAT_ENABLE(q->flags)) {
- deq_update = bbq_atomic64_fetch_add(&q->stat.n_deq, state.e.actual_burst) + state.e.actual_burst;
+ deq_update = bbq_atomic64_fetch_add(&q->chead.count, state.e.actual_burst) + state.e.actual_burst;
}
break;
case BBQ_NO_ENTRY:
@@ -945,7 +945,7 @@ static struct bbq_status __bbq_dequeue(struct bbq *q, void *deq_data, uint32_t n
}
if (BBQ_F_CHK_STAT_ENABLE(q->flags) && wait_consumed != NULL) {
- *wait_consumed = bbq_wait_consumed_get_by_stat(q, 0, deq_update);
+ *wait_consumed = bbq_wait_consumed_get(q, 0, deq_update);
}
return ret;
@@ -1082,8 +1082,8 @@ static uint32_t bbq_enqueue_burst_two_dimensional(struct bbq *q, void *const *ob
}
bool bbq_empty(struct bbq *q) {
- uint64_t phead = bbq_atomic64_load(&q->phead);
- uint64_t chead = bbq_atomic64_load(&q->chead);
+ uint64_t phead = bbq_atomic64_load(&q->phead.value);
+ uint64_t chead = bbq_atomic64_load(&q->chead.value);
uint64_t ph_vsn = bbq_head_vsn(q, phead);
uint64_t ch_vsn = bbq_head_vsn(q, chead);
@@ -1209,8 +1209,8 @@ void bbq_debug_block_print(struct bbq *q, struct bbq_block *block) {
void bbq_debug_struct_print(struct bbq *q) {
printf("-----bbq:%s-----\n", BBQ_F_CHK_DROP_OLD(q->flags) ? "drop old" : "retry new");
- uint64_t phead = bbq_atomic64_load(&q->phead);
- uint64_t chead = bbq_atomic64_load(&q->chead);
+ uint64_t phead = bbq_atomic64_load(&q->phead.value);
+ uint64_t chead = bbq_atomic64_load(&q->chead.value);
printf("block number:%u block size:%u total entries:%u\n", q->bn, q->bs, q->bn * q->bs);
printf("producer header idx:%lu vsn:%lu\n", bbq_head_idx(q, phead), bbq_head_vsn(q, phead));