summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author刘煜 <[email protected]>2024-06-14 02:22:54 +0000
committer刘煜 <[email protected]>2024-06-14 02:22:54 +0000
commit557417d67aa0eab4fa169e41c1ec1ee53bb3086b (patch)
tree3f477297451e967a7218e686b14e1ea3b14648c6
parent3a47a560498d4a12d06f364593f12f1c5e72b302 (diff)
burst功能完成
-rw-r--r--bbq/include/bbq.h48
-rw-r--r--bbq/src/bbq.c550
-rw-r--r--bbq/tests/common/test_queue.c122
-rw-r--r--bbq/tests/common/test_queue.h6
-rw-r--r--bbq/tests/unittest/ut_data.cc16
-rw-r--r--bbq/tests/unittest/ut_example.cc138
-rw-r--r--bbq/tests/unittest/ut_head_cursor.cc55
-rw-r--r--bbq/tests/unittest/ut_mix.cc2
-rw-r--r--perf/benchmark/bcm_benchmark.c2
-rw-r--r--perf/benchmark/bcm_queue.c14
-rwxr-xr-xperf/benchmark/benchmark.sh63
11 files changed, 601 insertions, 415 deletions
diff --git a/bbq/include/bbq.h b/bbq/include/bbq.h
index d08b316..7877cce 100644
--- a/bbq/include/bbq.h
+++ b/bbq/include/bbq.h
@@ -1,6 +1,6 @@
/*
* @Author: [email protected]
- * @LastEditTime: 2024-06-07 16:30:54
+ * @LastEditTime: 2024-06-14 09:27:35
* @Describe: bbq(Block-based Bounded Queue)头文件
* 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf
*/
@@ -28,13 +28,15 @@ using bbq_head = std::atomic<uint64_t>;
using aotmic_uint64 = std::atomic<uint64_t>;
#endif
+#define __BBQ_CACHE_ALIGNED __attribute__((__aligned__(64)))
+
typedef struct {
bbq_cursor committed; // 已提交(version|offset)
bbq_cursor allocated; // 已分配(version|offset)
bbq_cursor reserved; // 已预留(version|offset)
bbq_cursor consumed; // 已消费(version|offset), 在drop-old模式下没用
- char *entries; // BS大小的动态数组
-} bbq_block_s;
+ char *entries;
+} __BBQ_CACHE_ALIGNED bbq_block_s;
typedef enum {
BBQ_SUCCESS = 0,
@@ -46,15 +48,16 @@ typedef enum {
} bbq_queue_state_e;
typedef struct {
- bbq_block_s *block; // 指向所在的block
- uint64_t bbq_off; // entry在当前block的偏移(offset)
- uint64_t vsn; // allocated游标的版本(version)
+ uint64_t off; // entry在当前block的偏移(offset)
+ uint64_t vsn; // allocated游标的版本(vsn)
+ uint32_t actual_burst; // 实际出入队个数
+ bbq_block_s *block; // 指向所在的block
} bbq_entry_desc_s;
typedef struct {
bbq_queue_state_e state; // 队列状态
union {
- uint64_t vsn; // state==BLOCK_DONE时生效
+ uint64_t vsn; // reserve_entry state==BLOCK_DONE时生效
bbq_entry_desc_s e; // state为ALLOCATED、RESERVED生效
};
} bbq_queue_state_s;
@@ -73,19 +76,21 @@ typedef struct {
#define BBQ_COPY_POINTER(flags) (!(flags & BBQ_F_COPY_VALUE))
typedef struct {
- int socket_id; // 在哪个socket_id上使用libnuma分配内存,-1表示无效,使用malloc分配
- size_t bs; // 每个block里entries成员的大小
- size_t bn; // blocks个数
+ size_t bs; // 每个block里entries成员的大小
+ size_t bn; // blocks个数
size_t obj_size;
size_t entry_size;
- unsigned int flags;
- unsigned int idx_bits;
- unsigned int off_bits;
+
+ int32_t socket_id; // 在哪个socket_id上使用libnuma分配内存,-1表示无效,使用malloc分配
+ uint32_t flags;
+ uint32_t idx_bits;
+ uint32_t off_bits;
+
uint64_t idx_mask;
uint64_t off_mask;
+ bbq_head phead; // 生产者头,指向块的索引(version|idx)
+ bbq_head chead; // 消费者头,指向块的索引(version|idx)
- bbq_head phead; // 生产者头,指向块的索引(version|idx)
- bbq_head chead; // 消费者头,指向块的索引(version|idx)
bbq_block_s *blocks; // bn大小的动态数组
} bbq_queue_s;
@@ -161,26 +166,21 @@ extern int bbq_dequeue(bbq_queue_s *q, void *deq_data);
* @return
* 实际出队个数
*/
-extern uint32_t bbq_dequeue_burst(bbq_queue_s *q, void *obj_table, uint32_t n);
-
-// flags 第一位控制传入的是一维数组还是二维数组
-#define BBQ_F_ARRAY_1D 0x0001
-#define BBQ_F_ARRAY_2D BBQ_F_DEFAULT
-#define BBQ_CHK_ARRAY_1D(flags) (flags & BBQ_F_ARRAY_1D)
-#define BBQ_CHK_ARRAY_2D(flags) (!(flags & BBQ_F_ARRAY_1D))
+extern uint32_t bbq_dequeue_burst_one_dimensional(bbq_queue_s *q, void *obj_table, uint32_t n);
+extern uint32_t bbq_dequeue_burst_two_dimensional(bbq_queue_s *q, void **obj_table, uint32_t n);
/**
* 尝试一次入队多个数据,直到达到最大数量,或是入队失败
*
* @param q
* 队列指针
* @param obj_table
- * @param flags
* @param n
* obj_table的成员个数
* @return
* 实际入队个数
*/
-uint32_t bbq_enqueue_burst(bbq_queue_s *q, void *obj_table, size_t n, unsigned int flags);
+extern uint32_t bbq_enqueue_burst_one_dimensional(bbq_queue_s *q, void *obj_table, uint32_t n);
+extern uint32_t bbq_enqueue_burst_two_dimensional(bbq_queue_s *q, void **obj_table, uint32_t n);
// -----------------------------用于内存测试-------------------------------
// 当BBQ_MEMORY宏定义开关打开,将对内存分配释放进行统计,方便排查内存泄漏
diff --git a/bbq/src/bbq.c b/bbq/src/bbq.c
index bfdc30f..cea7a54 100644
--- a/bbq/src/bbq.c
+++ b/bbq/src/bbq.c
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-06 12:02:08
+ * @LastEditTime: 2024-06-13 23:11:20
* @Describe: bbq(Block-based Bounded Queue)实现
* 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf
@@ -11,26 +11,36 @@
#include <stdio.h>
#include <string.h>
-extern bbq_queue_state_s allocate_entry(bbq_queue_s *q, bbq_block_s *block);
-extern void commit_entry(bbq_queue_s *q, bbq_entry_desc_s *e, void *data);
-extern bbq_queue_state_s advance_phead(bbq_queue_s *q, uint64_t ph);
-extern bbq_queue_state_s reserve_entry(bbq_queue_s *q, bbq_block_s *block);
-extern bool consume_entry(bbq_queue_s *q, bbq_entry_desc_s *e, void *deq_data);
-extern bool advance_chead(bbq_queue_s *q, uint64_t ch, uint64_t ver);
-extern inline uint64_t bbq_idx(bbq_queue_s *q, uint64_t x);
-extern inline uint64_t bbq_off(bbq_queue_s *q, uint64_t x);
-extern inline uint64_t bbq_head_vsn(bbq_queue_s *q, uint64_t x);
-extern inline uint64_t bbq_cur_vsn(bbq_queue_s *q, uint64_t x);
-extern inline uint64_t set_cur_vsn(bbq_queue_s *q, uint64_t ver);
-extern void *bbq_memset(void *data, size_t size);
-extern void *bbq_malloc(bbq_module_e module, int socket_id, size_t size);
-extern void bbq_free(bbq_module_e module, int socket_id, void *ptr, size_t size);
+struct bbq_status {
+ int32_t status; // 返回状态
+ uint32_t actual_burst; // 实际出/入队个数
+};
+
+#define BBQ_MEM_MAGIC 0xFF
+
+extern bbq_queue_state_s allocate_entry(bbq_queue_s* q, bbq_block_s* block, uint32_t n);
+extern bbq_queue_state_e advance_phead(bbq_queue_s* q, uint64_t ph);
+extern bbq_queue_state_s reserve_entry(bbq_queue_s* q, bbq_block_s* block, uint32_t n);
+extern bool consume_entry(bbq_queue_s* q, bbq_entry_desc_s* e, void* deq_data, uint32_t flag);
+extern bool advance_chead(bbq_queue_s* q, uint64_t ch, uint64_t ver);
+extern inline uint64_t bbq_idx(bbq_queue_s* q, uint64_t x);
+extern inline uint64_t bbq_off(bbq_queue_s* q, uint64_t x);
+extern inline uint64_t bbq_head_vsn(bbq_queue_s* q, uint64_t x);
+extern inline uint64_t bbq_cur_vsn(bbq_queue_s* q, uint64_t x);
+extern inline uint64_t set_cur_vsn(bbq_queue_s* q, uint64_t ver);
+extern void* bbq_memset(void* data, size_t size);
+extern void* bbq_malloc(bbq_module_e module, int socket_id, size_t size);
+extern void bbq_free(bbq_module_e module, int socket_id, void* ptr, size_t size);
+
+#ifdef BBQ_DEBUG
+extern void bbq_struct_print(bbq_queue_s* q);
+#endif
/* 原子的比较两个值大小,并设置较大的值,成功则返回设置前的旧值 */
-uint64_t fetch_max(aotmic_uint64 *atom, uint64_t upd) {
+uint64_t fetch_max(aotmic_uint64* atom, uint64_t upd) {
uint64_t old_value;
do {
- old_value = atomic_load(atom); // 读取当前值
+ old_value = atomic_load(atom); // 读取当前值
} while (old_value < upd && !atomic_compare_exchange_weak(atom, &old_value, upd));
return old_value;
@@ -49,16 +59,25 @@ bool bbq_check_power_of_two(uint32_t n) {
* 计算公式:log2(blocks) = max(1, ⌊log2(count)/4⌋) 注:⌊ ⌋代表向下取整。*/
uint32_t bbq_blocks_calc(uint32_t entries) {
double log_entries = log2((double)entries);
- uint32_t over4 = (uint32_t)(log_entries / 4); // 向下取整
+ uint32_t over4 = (uint32_t)(log_entries / 4); // 向下取整
uint32_t max_value = (over4 > 1) ? over4 : 1;
uint32_t n = pow(2, max_value);
return n;
}
/* 块初始化 */
-int block_init(bbq_queue_s *q, bbq_block_s *block, bool cursor_init) {
+int block_init(bbq_queue_s* q, bbq_block_s* block, bool cursor_init) {
+#ifdef BBQ_MEMORY
+ // 末尾多分配一个entry(永远不应该被修改),以此检查是否存在写越界的问题
+ block->entries = bbq_malloc(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id,
+ (q->bs + 1) * q->entry_size);
+ char* last_entry = block->entries + q->entry_size * q->bs;
+ memset(last_entry, BBQ_MEM_MAGIC, q->entry_size);
+#else
block->entries = bbq_malloc(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id,
- sizeof(*block->entries) * q->bs * q->entry_size);
+ q->bs * q->entry_size);
+#endif
+
if (block->entries == NULL) {
BBQ_ERR_LOG("bbq_malloc error");
return BBQ_ALLOC_ERR;
@@ -85,10 +104,15 @@ int block_init(bbq_queue_s *q, bbq_block_s *block, bool cursor_init) {
}
/* 块清理函数,与block_init成对*/
-void block_cleanup(bbq_queue_s *q, bbq_block_s *block) {
+void block_cleanup(bbq_queue_s* q, bbq_block_s* block) {
if (block->entries) {
+#ifdef BBQ_MEMORY
+ bbq_free(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id,
+ block->entries, sizeof(*block->entries) * (q->bs + 1) * q->entry_size);
+#else
bbq_free(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id,
block->entries, sizeof(*block->entries) * q->bs * q->entry_size);
+#endif
block->entries = NULL;
}
}
@@ -113,7 +137,7 @@ unsigned ceil_log2(uint64_t x) {
}
/* 创建消息队列,bn和bs必须是2的N次幂,socket_id用于多numa分配内存,free_func先设置NULL */
-bbq_queue_s *bbq_ring_create_bnbs_with_socket(uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, unsigned int flags) {
+bbq_queue_s* bbq_ring_create_bnbs_with_socket(uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, unsigned int flags) {
int ret = 0;
bool numa_enable = true;
@@ -137,7 +161,7 @@ bbq_queue_s *bbq_ring_create_bnbs_with_socket(uint32_t bn, uint32_t bs, size_t o
socket_id = BBQ_INVALID_SOCKET;
}
- bbq_queue_s *q = bbq_malloc(BBQ_MODULE_QUEUE, socket_id, sizeof(*q));
+ bbq_queue_s* q = bbq_malloc(BBQ_MODULE_QUEUE, socket_id, sizeof(*q));
if (q == NULL) {
BBQ_ERR_LOG("malloc for bbq queue error");
return NULL;
@@ -148,7 +172,7 @@ bbq_queue_s *bbq_ring_create_bnbs_with_socket(uint32_t bn, uint32_t bs, size_t o
q->bs = bs;
q->obj_size = obj_size;
if (BBQ_COPY_POINTER(flags)) {
- q->entry_size = sizeof(uintptr_t);
+ q->entry_size = sizeof(void*);
} else {
q->entry_size = obj_size;
}
@@ -188,12 +212,12 @@ error:
}
/* 创建消息队列,bn和bs必须是2的N次幂,free_func先设置NULL */
-bbq_queue_s *bbq_ring_create_bnbs(uint32_t bn, uint32_t bs, size_t obj_size, unsigned int flags) {
+bbq_queue_s* bbq_ring_create_bnbs(uint32_t bn, uint32_t bs, size_t obj_size, unsigned int flags) {
return bbq_ring_create_bnbs_with_socket(bn, bs, obj_size, BBQ_INVALID_SOCKET, flags);
}
/* 创建消息队列,count必须大于1,且是2的N次幂,bn和bs将根据count值自动计算,socket_id用于多numa分配内存,free_func先设置NULL */
-bbq_queue_s *bbq_ring_create_with_socket(uint32_t count, size_t obj_size, int socket_id, unsigned int flags) {
+bbq_queue_s* bbq_ring_create_with_socket(uint32_t count, size_t obj_size, int socket_id, unsigned int flags) {
if (bbq_check_power_of_two(count) == false || count == 1) {
BBQ_ERR_LOG("bbq entries number must be power of two and greater than 1, now is :%lu", count);
return NULL;
@@ -205,13 +229,13 @@ bbq_queue_s *bbq_ring_create_with_socket(uint32_t count, size_t obj_size, int so
}
/* 创建消息队列,count必须大于1,且是2的N次幂,bn和bs将根据count值自动计算,free_func先设置NULL */
-bbq_queue_s *bbq_ring_create(uint32_t count, size_t obj_size, unsigned int flags) {
+bbq_queue_s* bbq_ring_create(uint32_t count, size_t obj_size, unsigned int flags) {
// 传入无效socket_id,将使用malloc分配内存
return bbq_ring_create_with_socket(count, obj_size, BBQ_INVALID_SOCKET, flags);
}
/* 释放消息队列,与bbq_ring_create系列接口成对*/
-void bbq_ring_free(bbq_queue_s *q) {
+void bbq_ring_free(bbq_queue_s* q) {
if (q == NULL) {
return;
}
@@ -224,140 +248,278 @@ void bbq_ring_free(bbq_queue_s *q) {
bbq_free(BBQ_MODULE_QUEUE, q->socket_id, q, sizeof(*q));
}
+// flags 第一位控制传入的是一维数组还是二维数组
+#define BBQ_F_SINGLE 0x0
+#define BBQ_F_ARRAY_1D 0x1
+#define BBQ_F_ARRAY_2D 0x2
+void commit_entry(bbq_queue_s* q, bbq_entry_desc_s* e, void* data, uint32_t flag) {
+ size_t idx = e->off * q->entry_size;
+
+ if (BBQ_COPY_POINTER(q->flags)) {
+ // 指针入队列
+ switch (flag) {
+ case BBQ_F_ARRAY_1D:
+ char* tmp = (char*)data;
+ char* entry = &(e->block->entries[idx]);
+ for (size_t i = 0; i < e->actual_burst; i++) {
+ memcpy(entry, &tmp, q->entry_size);
+ entry += q->entry_size;
+ tmp += q->obj_size;
+ }
+ break;
+ case BBQ_F_ARRAY_2D:
+ case BBQ_F_SINGLE:
+ // 二维数组名等于首成员的地址,这里data其实是void **data, &data等同于 &(data[0])
+ memcpy(&(e->block->entries[idx]), data, q->entry_size * e->actual_burst);
+
+ void** tmp3 = (void**)(e->block->entries);
+ uint16_t* tmp2 = (uint16_t*)(tmp3[0]);
+ break;
+ default:
+ break;
+ }
+ } else {
+ // 数据入队列
+ switch (flag) {
+ case BBQ_F_ARRAY_1D:
+ case BBQ_F_SINGLE:
+ memcpy(&(e->block->entries[idx]), data, q->entry_size * e->actual_burst);
+ break;
+ case BBQ_F_ARRAY_2D:
+ void** tmp = (void**)data;
+ char* entry = &(e->block->entries[idx]);
+ for (size_t i = 0; i < e->actual_burst; i++) {
+ memcpy(entry, *tmp, q->entry_size);
+ entry += q->entry_size;
+ tmp++;
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ atomic_fetch_add(&e->block->committed, e->actual_burst);
+}
+
/* 消息队列入队 */
-int bbq_enqueue(bbq_queue_s *q, void *data) {
+static struct bbq_status __bbq_enqueue(bbq_queue_s* q, void* data, uint32_t n, uint32_t flag) {
+ struct bbq_status ret = {.status = 0, .actual_burst = 0};
+
if (q == NULL || data == NULL) {
- return BBQ_NULL_PTR;
+ ret.status = BBQ_NULL_PTR;
+ return ret;
}
while (true) {
// 获取当前phead,转为索引后获取到当前的blk
uint64_t ph = atomic_load(&q->phead);
- bbq_block_s *blk = &(q->blocks[bbq_idx(q, ph)]);
-
- bbq_queue_state_s ps;
- bbq_queue_state_s state = allocate_entry(q, blk);
+ bbq_block_s* blk = &(q->blocks[bbq_idx(q, ph)]);
+ bbq_queue_state_s state = allocate_entry(q, blk, n);
switch (state.state) {
- case BBQ_ALLOCATED:
- commit_entry(q, &state.e, data);
- return BBQ_OK;
- case BBQ_BLOCK_DONE:
- ps = advance_phead(q, ph);
- switch (ps.state) {
- case BBQ_NO_ENTRY:
- return BBQ_QUEUE_FULL;
- case BBQ_NOT_AVAILABLE:
- return BBQ_QUEUE_BUSY;
- case BBQ_SUCCESS:
- continue;
- }
- break;
- default:
- BBQ_ERR_LOG("Invalid QueueState in bbq_enqueue: %d", state.state);
- return BBQ_ERROR;
+ case BBQ_ALLOCATED:
+ commit_entry(q, &state.e, data, flag);
+ ret.actual_burst = state.e.actual_burst;
+ ret.status = BBQ_OK;
+ return ret;
+ case BBQ_BLOCK_DONE:
+ bbq_queue_state_e pstate = advance_phead(q, ph);
+ switch (pstate) {
+ case BBQ_NO_ENTRY:
+ ret.status = BBQ_QUEUE_FULL;
+ return ret;
+ case BBQ_NOT_AVAILABLE:
+ ret.status = BBQ_QUEUE_BUSY;
+ return ret;
+ case BBQ_SUCCESS:
+ continue;
+ }
+ break;
+ default:
+ BBQ_DBG_LOG("Invalid QueueState in bbq_enqueue: %d", state.state);
+ ret.status = BBQ_ERROR;
+ return ret;
}
}
}
+int bbq_enqueue(bbq_queue_s* q, void* data) {
+ struct bbq_status ret = __bbq_enqueue(q, data, 1, BBQ_F_SINGLE);
+ return ret.status;
+}
+
/* 消息队列出队 */
-int bbq_dequeue(bbq_queue_s *q, void *deq_data) {
+static struct bbq_status __bbq_dequeue(bbq_queue_s* q, void* deq_data, uint32_t n, uint32_t flag) {
+ struct bbq_status ret = {.status = 0, .actual_burst = 0};
if (q == NULL || deq_data == NULL) {
- return BBQ_NULL_PTR;
+ ret.status = BBQ_NULL_PTR;
+ return ret;
}
while (true) {
uint64_t ch = atomic_load(&q->chead);
- bbq_block_s *blk = &(q->blocks[bbq_idx(q, ch)]);
+ bbq_block_s* blk = &(q->blocks[bbq_idx(q, ch)]);
bbq_queue_state_s state;
- state = reserve_entry(q, blk);
+ state = reserve_entry(q, blk, n);
switch (state.state) {
- case BBQ_RESERVED:
- if (consume_entry(q, &state.e, deq_data)) {
- return BBQ_OK;
- } else {
- continue;
- }
- case BBQ_NO_ENTRY:
- return BBQ_QUEUE_EMPTY;
- case BBQ_NOT_AVAILABLE:
- return BBQ_QUEUE_BUSY;
- case BBQ_BLOCK_DONE:
- if (advance_chead(q, ch, state.vsn)) {
- continue;
- } else {
- return BBQ_QUEUE_EMPTY;
- }
- default:
- BBQ_ERR_LOG("Invalid QueueState in dequeue state: %d", state.state);
- return BBQ_ERROR;
+ case BBQ_RESERVED:
+ if (consume_entry(q, &state.e, deq_data, flag)) {
+ ret.status = BBQ_OK;
+ ret.actual_burst = state.e.actual_burst;
+ return ret;
+ } else {
+ continue;
+ }
+ case BBQ_NO_ENTRY:
+ ret.status = BBQ_QUEUE_EMPTY;
+ return ret;
+ case BBQ_NOT_AVAILABLE:
+ ret.status = BBQ_QUEUE_BUSY;
+ return ret;
+ case BBQ_BLOCK_DONE:
+ if (advance_chead(q, ch, state.vsn)) {
+ continue;
+ } else {
+ ret.status = BBQ_QUEUE_EMPTY;
+ return ret;
+ }
+ default:
+ BBQ_DBG_LOG("Invalid QueueState in dequeue state: %d", state.state);
+ ret.status = BBQ_ERROR;
+ return ret;
}
}
}
-/* 将多个对象从一个环形队列ring取出,直到达到最大数量,或是出队失败 */
-uint32_t bbq_dequeue_burst(bbq_queue_s *q, void *obj_table, uint32_t n) {
+int bbq_dequeue(bbq_queue_s* q, void* deq_data) {
+ struct bbq_status ret = __bbq_dequeue(q, deq_data, 1, BBQ_F_SINGLE);
+ return ret.status;
+}
+
+uint32_t bbq_max_burst(bbq_queue_s* q, uint32_t n) {
+ uint32_t burst = n;
+ if (burst > q->bs) {
+ burst = q->bs;
+ }
+
+ return burst;
+}
+
+uint32_t bbq_dequeue_burst_one_dimensional(bbq_queue_s* q, void* obj_table, uint32_t n) {
if (q == NULL || obj_table == NULL) {
return BBQ_NULL_PTR;
}
- uint32_t cnt = 0;
- int ret = 0;
- void *obj = NULL;
+ uint32_t burst = 0;
+ uint32_t ready = 0;
+ void* obj = obj_table;
+ struct bbq_status ret = {0};
- for (cnt = 0; cnt < n; cnt++) {
- if (BBQ_COPY_VALUE(q->flags)) {
- obj = (obj == NULL ? obj_table : obj + q->obj_size);
- } else {
- obj = &((void **)obj_table)[cnt];
+ while (ready < n) {
+ burst = bbq_max_burst(q, n - ready);
+ ret = __bbq_dequeue(q, obj, burst, BBQ_F_ARRAY_1D);
+ if (ret.status != BBQ_OK) {
+ break;
}
+ obj += q->obj_size * ret.actual_burst;
+ ready += ret.actual_burst;
- ret = bbq_dequeue(q, obj);
- if (ret != BBQ_OK) {
- return cnt;
+ // bbq_struct_print(q);
+ }
+
+ return ready;
+}
+
+uint32_t bbq_dequeue_burst_two_dimensional(bbq_queue_s* q, void** obj_table, uint32_t n) {
+ if (q == NULL || obj_table == NULL) {
+ return BBQ_NULL_PTR;
+ }
+
+ uint32_t burst = 0;
+ uint32_t ready = 0;
+ void** obj_table_tmp = obj_table;
+ struct bbq_status ret = {0};
+
+ while (ready < n) {
+ burst = bbq_max_burst(q, n - ready);
+ ret = __bbq_dequeue(q, obj_table_tmp, burst, BBQ_F_ARRAY_2D);
+ if (ret.status != BBQ_OK) {
+ break;
}
+ obj_table_tmp += ret.actual_burst;
+ ready += ret.actual_burst;
+
+ // bbq_struct_print(q);
}
- return cnt;
+ return ready;
}
/* 尝试一次入队多个数据,直到达到最大数量,或是入队失败 */
-uint32_t bbq_enqueue_burst(bbq_queue_s *q, void *obj_table, size_t n, unsigned int flags) {
+uint32_t bbq_enqueue_burst_one_dimensional(bbq_queue_s* q, void* obj_table, uint32_t n) {
if (q == NULL || obj_table == NULL) {
return BBQ_NULL_PTR;
}
- uint32_t cnt = 0;
- void *obj = NULL;
+ uint32_t burst = 0;
+ uint32_t ready = 0;
+ void* obj = obj_table;
+ struct bbq_status ret = {0};
- for (cnt = 0; cnt < n; cnt++) {
- if (BBQ_CHK_ARRAY_1D(flags)) {
- obj = (obj == NULL ? obj_table : obj + q->obj_size);
- } else {
- obj = ((void **)obj_table)[cnt];
+ while (ready < n) {
+ burst = bbq_max_burst(q, n - ready);
+ ret = __bbq_enqueue(q, obj, burst, BBQ_F_ARRAY_1D);
+ if (ret.status != BBQ_OK) {
+ break;
}
+ obj += q->obj_size * ret.actual_burst;
+ ready += ret.actual_burst;
- if (BBQ_OK != bbq_enqueue(q, obj)) {
- return cnt;
+ // bbq_struct_print(q);
+ }
+
+ return ready;
+}
+
+/* 尝试一次入队多个数据,直到达到最大数量,或是入队失败 */
+uint32_t bbq_enqueue_burst_two_dimensional(bbq_queue_s* q, void** obj_table, uint32_t n) {
+ if (q == NULL || obj_table == NULL) {
+ return BBQ_NULL_PTR;
+ }
+
+ uint32_t burst = 0;
+ uint32_t ready = 0;
+ void** obj_table_tmp = obj_table;
+ struct bbq_status ret = {0};
+
+ while (ready < n) {
+ burst = bbq_max_burst(q, n - ready);
+ ret = __bbq_enqueue(q, obj_table_tmp, burst, BBQ_F_ARRAY_2D);
+ if (ret.status != BBQ_OK) {
+ break;
}
+ obj_table_tmp += ret.actual_burst;
+ ready += ret.actual_burst;
+
+ // bbq_struct_print(q);
}
- return cnt;
+ return ready;
}
-bbq_queue_state_s allocate_entry(bbq_queue_s *q, bbq_block_s *block) {
+bbq_queue_state_s allocate_entry(bbq_queue_s* q, bbq_block_s* block, uint32_t n) {
bbq_queue_state_s state = {0};
if (bbq_off(q, atomic_load(&block->allocated)) >= q->bs) {
state.state = BBQ_BLOCK_DONE;
return state;
}
- uint64_t old = atomic_fetch_add(&block->allocated, 1);
+ uint64_t old = atomic_fetch_add(&block->allocated, n); // burst
uint64_t committed_vsn = bbq_cur_vsn(q, atomic_load(&block->committed));
- // committed_vsn,在当前块被初始化后值是不变的,判断vsn是考虑到极限情况下给off预留的空间不足导致,allocated的off溢出(vsn+1)
+ // committed_vsn在当前块被初始化后值是不变的,通过比较vsn值,来判断allocated的off是否溢出了,导致vsn+1
uint64_t cur_vsn = bbq_cur_vsn(q, old);
uint64_t cur_off = bbq_off(q, old);
if ((cur_vsn != committed_vsn) || (cur_off >= q->bs)) {
@@ -365,50 +527,43 @@ bbq_queue_state_s allocate_entry(bbq_queue_s *q, bbq_block_s *block) {
return state;
}
- state.state = BBQ_ALLOCATED;
+ if (cur_off + n <= q->bs) {
+ // 可以全部入队
+ state.e.actual_burst = n;
+ } else {
+ // 部分入队
+ state.e.actual_burst = q->bs - cur_off;
+ }
state.e.block = block;
state.e.vsn = cur_vsn;
- state.e.bbq_off = cur_off;
+ state.e.off = cur_off;
+ state.state = BBQ_ALLOCATED;
return state;
}
-void commit_entry(bbq_queue_s *q, bbq_entry_desc_s *e, void *data) {
- size_t idx = e->bbq_off * q->entry_size;
- if (BBQ_COPY_POINTER(q->flags)) {
- uintptr_t *uptr = (uintptr_t *)(&(e->block->entries[idx]));
- *uptr = (uintptr_t)(data);
- } else {
- memcpy(&(e->block->entries[idx]), data, q->entry_size);
- }
- atomic_fetch_add(&e->block->committed, 1);
-}
-
-bbq_queue_state_s advance_phead(bbq_queue_s *q, uint64_t ph) {
+bbq_queue_state_e advance_phead(bbq_queue_s* q, uint64_t ph) {
// 获取下一个block
- bbq_block_s *n_blk;
+ bbq_block_s* n_blk;
n_blk = &(q->blocks[(bbq_idx(q, ph) + 1) & q->idx_mask]);
uint64_t cur, reserved;
- bbq_queue_state_s state = {0};
if (BBQ_POLIC_RETRY_NEW(q->flags)) {
cur = atomic_load(&n_blk->consumed);
- if (bbq_cur_vsn(q, cur) < bbq_head_vsn(q, ph) || // 生产者赶上了消费者
+ if (bbq_cur_vsn(q, cur) < bbq_head_vsn(q, ph) || // 生产者赶上了消费者
(bbq_cur_vsn(q, cur) == bbq_head_vsn(q, ph) && bbq_off(q, cur) != q->bs)) {
reserved = atomic_load(&n_blk->reserved);
if (bbq_off(q, reserved) == bbq_off(q, cur)) {
- state.state = BBQ_NO_ENTRY;
+ return BBQ_NO_ENTRY;
} else {
- state.state = BBQ_NOT_AVAILABLE;
+ return BBQ_NOT_AVAILABLE;
}
- return state;
}
} else {
cur = atomic_load(&n_blk->committed);
// 生产者避免前进到上一轮中尚未完全提交的区块
if (bbq_cur_vsn(q, cur) == bbq_head_vsn(q, ph) && bbq_off(q, cur) != q->bs) {
- state.state = BBQ_NOT_AVAILABLE;
- return state;
+ return BBQ_NOT_AVAILABLE;
}
}
@@ -418,44 +573,64 @@ bbq_queue_state_s advance_phead(bbq_queue_s *q, uint64_t ph) {
// 索引+1,当超过索引范围,也就是循环下一轮块时,version+1
fetch_max(&q->phead, ph + 1);
- state.state = BBQ_SUCCESS;
- return state;
+ return BBQ_SUCCESS;
+}
+
+/* 更新成功 reserve成功的个数 */
+uint32_t reserve_update(bbq_cursor* aotmic, uint64_t reserved, uint32_t n) {
+ if (n == 1) {
+ // fetch_max返回的是旧值,如果旧值等于局部变量reserved,则代表数据由当前线程更新
+ if (fetch_max(aotmic, reserved + 1) == reserved) {
+ return 1;
+ }
+
+ return 0;
+ } else {
+ bool ret = atomic_compare_exchange_weak(aotmic, &reserved, reserved + n);
+ return ret == true ? n : 0;
+ }
}
-bbq_queue_state_s reserve_entry(bbq_queue_s *q, bbq_block_s *block) {
+bbq_queue_state_s reserve_entry(bbq_queue_s* q, bbq_block_s* block, uint32_t n) {
while (true) {
bbq_queue_state_s state;
uint64_t reserved = atomic_load(&block->reserved);
- if (bbq_off(q, reserved) < q->bs) {
+ uint64_t reserved_off = bbq_off(q, reserved);
+ uint64_t reserved_svn = bbq_cur_vsn(q, reserved);
+
+ if (reserved_off < q->bs) {
uint64_t consumed = atomic_load(&block->consumed);
- if (BBQ_POLIC_RETRY_NEW(q->flags) && bbq_cur_vsn(q, reserved) != bbq_cur_vsn(q, consumed)) {
+ if (BBQ_POLIC_RETRY_NEW(q->flags) && reserved_svn != bbq_cur_vsn(q, consumed)) {
// consumed溢出了,这种情况只发生在BBQ_RETRY_NEW,因为BBQ_DROP_OLD模式,consumed没有用到
state.state = BBQ_BLOCK_DONE;
- state.vsn = bbq_cur_vsn(q, reserved);
+ state.vsn = reserved_svn;
return state;
}
uint64_t committed = atomic_load(&block->committed);
- if (bbq_off(q, committed) == bbq_off(q, reserved)) { // TODO:多entry关注
+ uint64_t committed_off = bbq_off(q, committed);
+ if (committed_off == reserved_off) { // TODO:多entry关注
state.state = BBQ_NO_ENTRY;
return state;
}
// 当前块的数据没有被全部commited,需要通过判断allocated和committed来判断是否存在正在入队进行中的数据
- if (bbq_off(q, committed) != q->bs) {
+ if (committed_off != q->bs) {
uint64_t allocated = atomic_load(&block->allocated);
- if (bbq_off(q, allocated) != bbq_off(q, committed)) {
+ if (bbq_off(q, allocated) != committed_off) {
state.state = BBQ_NOT_AVAILABLE;
return state;
}
}
- if (fetch_max(&block->reserved, reserved + 1) == reserved) { // TODO:多entry时关注
- // fetch_max返回的是旧值,因此预期返回的旧值要等于局部变量reserved
+ int32_t tmp = committed_off - reserved_off;
+ uint32_t reserved_cnt = reserve_update(&block->reserved, reserved, tmp < n ? tmp : n);
+ if (reserved_cnt > 0) { // TODO:多entry时关注
state.state = BBQ_RESERVED;
+ state.e.actual_burst = reserved_cnt;
state.e.block = block;
- state.e.bbq_off = bbq_off(q, reserved);
- state.e.vsn = bbq_cur_vsn(q, reserved);
+ state.e.off = reserved_off;
+ state.e.vsn = reserved_svn;
return state;
} else {
@@ -465,23 +640,47 @@ bbq_queue_state_s reserve_entry(bbq_queue_s *q, bbq_block_s *block) {
}
state.state = BBQ_BLOCK_DONE;
- state.vsn = bbq_cur_vsn(q, reserved);
+ state.vsn = reserved_svn;
return state;
}
}
-bool consume_entry(bbq_queue_s *q, bbq_entry_desc_s *e, void *deq_data) {
- size_t idx = e->bbq_off * q->entry_size;
+bool consume_entry(bbq_queue_s* q, bbq_entry_desc_s* e, void* deq_data, uint32_t flag) {
+ size_t idx = e->off * q->entry_size;
+
if (BBQ_COPY_POINTER(q->flags)) {
- uintptr_t *uptr = (uintptr_t *)(&(e->block->entries[idx]));
- *((void **)deq_data) = (void *)(*uptr);
+ switch (flag) {
+ case BBQ_F_ARRAY_2D:
+ case BBQ_F_SINGLE:
+ memcpy(deq_data, &(e->block->entries[idx]), q->entry_size * e->actual_burst);
+ break;
+ case BBQ_F_ARRAY_1D:
+ default:
+ break;
+ }
} else {
- memcpy(deq_data, &(e->block->entries[idx]), q->entry_size);
+ switch (flag) {
+ case BBQ_F_ARRAY_1D:
+ case BBQ_F_SINGLE:
+ memcpy(deq_data, &(e->block->entries[idx]), q->entry_size * e->actual_burst);
+ break;
+ case BBQ_F_ARRAY_2D:
+ void** tmp = (void**)deq_data;
+ char* entry = &(e->block->entries[idx]);
+ for (size_t i = 0; i < e->actual_burst; i++) {
+ memcpy(*tmp, entry, q->entry_size);
+ entry += q->entry_size;
+ tmp++;
+ }
+ break;
+ default:
+ break;
+ }
}
uint64_t allocated;
if (BBQ_POLIC_RETRY_NEW(q->flags)) {
- atomic_fetch_add(&e->block->consumed, 1);
+ atomic_fetch_add(&e->block->consumed, e->actual_burst);
} else {
allocated = atomic_load(&e->block->allocated);
// 预留的entry所在的块,已经被新生产的数据赶上了
@@ -493,8 +692,8 @@ bool consume_entry(bbq_queue_s *q, bbq_entry_desc_s *e, void *deq_data) {
return true;
}
-bool advance_chead(bbq_queue_s *q, uint64_t ch, uint64_t ver) {
- bbq_block_s *n_blk = &(q->blocks[(bbq_idx(q, ch) + 1) & q->idx_mask]);
+bool advance_chead(bbq_queue_s* q, uint64_t ch, uint64_t ver) {
+ bbq_block_s* n_blk = &(q->blocks[(bbq_idx(q, ch) + 1) & q->idx_mask]);
uint64_t committed = atomic_load(&n_blk->committed);
if (BBQ_POLIC_RETRY_NEW(q->flags)) {
@@ -516,27 +715,27 @@ bool advance_chead(bbq_queue_s *q, uint64_t ch, uint64_t ver) {
return true;
}
-inline uint64_t bbq_idx(bbq_queue_s *q, uint64_t x) {
+inline uint64_t bbq_idx(bbq_queue_s* q, uint64_t x) {
return x & q->idx_mask;
}
-inline uint64_t bbq_off(bbq_queue_s *q, uint64_t x) {
+inline uint64_t bbq_off(bbq_queue_s* q, uint64_t x) {
return x & q->off_mask;
}
-inline uint64_t bbq_head_vsn(bbq_queue_s *q, uint64_t x) {
+inline uint64_t bbq_head_vsn(bbq_queue_s* q, uint64_t x) {
return x >> q->idx_bits;
}
-inline uint64_t bbq_cur_vsn(bbq_queue_s *q, uint64_t x) {
+inline uint64_t bbq_cur_vsn(bbq_queue_s* q, uint64_t x) {
return x >> q->off_bits;
}
-inline uint64_t set_cur_vsn(bbq_queue_s *q, uint64_t ver) {
+inline uint64_t set_cur_vsn(bbq_queue_s* q, uint64_t ver) {
return ver << q->off_bits;
}
-void *bbq_memset(void *data, size_t size) {
+void* bbq_memset(void* data, size_t size) {
if (data != NULL && size > 0) {
memset(data, 0, size);
}
@@ -552,8 +751,8 @@ typedef struct {
bbq_memory_s bbq_memory_g[BBQ_MODULE_MAX] = {0};
#endif
-void *bbq_malloc(bbq_module_e module, int socket_id, size_t size) {
- void *ptr = NULL;
+void* bbq_malloc(bbq_module_e module, int socket_id, size_t size) {
+ void* ptr = NULL;
if (socket_id >= 0) {
ptr = numa_alloc_onnode(size, 0);
} else {
@@ -569,7 +768,7 @@ void *bbq_malloc(bbq_module_e module, int socket_id, size_t size) {
return ptr;
}
-void bbq_free(bbq_module_e module, int socket_id, void *ptr, size_t size) {
+void bbq_free(bbq_module_e module, int socket_id, void* ptr, size_t size) {
#ifdef BBQ_MEMORY
if (ptr != NULL) {
atomic_fetch_add(&bbq_memory_g[module].free_cnt, 1);
@@ -598,7 +797,7 @@ bool bbq_malloc_free_equal() {
uint64_t malloc_size = atomic_load(&bbq_memory_g[i].malloc_size);
uint64_t free_size = atomic_load(&bbq_memory_g[i].free_size);
if (malloc_size != free_size) {
- BBQ_ERR_LOG("[module:%d] malloc:%lu free:%lu, bbq mmalloc-free size not equal\n", i, malloc_cnt, free_cnt);
+ BBQ_ERR_LOG("[module:%d] malloc:%lu byte free:%lu byte, bbq mmalloc-free size not equal\n", i, malloc_size, free_size);
ret = false;
}
}
@@ -608,6 +807,22 @@ bool bbq_malloc_free_equal() {
#endif
}
+bool bbq_check_array_bounds(bbq_queue_s* q) {
+#ifdef BBQ_MEMORY
+ void* value = malloc(q->entry_size);
+ memset(value, BBQ_MEM_MAGIC, q->entry_size);
+
+ for (size_t i = 0; i < q->bn; i++) {
+ // 针对内存检查版本,申请了bs+1个entry
+ char* last_entry = q->blocks[i].entries + q->bs * q->entry_size;
+ if (memcmp(last_entry, value, q->entry_size) != 0) {
+ return false;
+ }
+ }
+#endif
+ return true;
+}
+
void bbq_memory_info() {
#ifdef BBQ_MEMORY
for (int i = 0; i < BBQ_MODULE_MAX; i++) {
@@ -628,4 +843,25 @@ void bbq_memory_info() {
BBQ_ERR_LOG("memory not all free");
}
#endif
-} \ No newline at end of file
+}
+
+#ifdef BBQ_DEBUG
+void bbq_block_print(bbq_queue_s* q, bbq_block_s* block) {
+ bbq_cursor allocated = atomic_load(&block->allocated);
+ bbq_cursor committed = atomic_load(&block->committed);
+ bbq_cursor reserved = atomic_load(&block->reserved);
+ bbq_cursor consumed = atomic_load(&block->consumed);
+ printf(" allocated:%lu\n", bbq_off(q, allocated));
+ printf(" committed:%lu\n", bbq_off(q, committed));
+ printf(" reserved:%lu\n", bbq_off(q, reserved));
+ printf(" consumed:%lu\n\n", bbq_off(q, consumed));
+}
+
+void bbq_struct_print(bbq_queue_s* q) {
+ printf("ph idx:%lu vsn:%lu\n", bbq_idx(q, q->phead), bbq_head_vsn(q, q->phead));
+ bbq_block_print(q, &(q->blocks[bbq_idx(q, q->phead)]));
+
+ // printf("ch idx:%lu vsn:%lu", bbq_idx(q, q->chead), bbq_head_vsn(q, q->chead));
+ // bbq_block_print(q, &(q->blocks[bbq_idx(q, q->chead)]));
+}
+#endif \ No newline at end of file
diff --git a/bbq/tests/common/test_queue.c b/bbq/tests/common/test_queue.c
index d8787e5..6b57df7 100644
--- a/bbq/tests/common/test_queue.c
+++ b/bbq/tests/common/test_queue.c
@@ -1,18 +1,23 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-11 11:46:12
+ * @LastEditTime: 2024-06-13 23:25:09
* @Describe: TODO
*/
#include "test_queue.h"
-#include "bbq.h"
-#include "test_mix.h"
#include <sys/prctl.h>
#include <unistd.h>
+#include "bbq.h"
+#include "test_mix.h"
+extern bool bbq_check_array_bounds(bbq_queue_s* q);
-int test_queue_init_bbq(test_cfg *cfg, test_queue_s *q) {
- bbq_queue_s *ring;
+uint32_t test_bbq_enqueue_burst(void* ring, void* obj_table, uint32_t n, uint16_t thread_idx) {
+ return bbq_enqueue_burst_two_dimensional(ring, obj_table, n);
+}
+
+int test_queue_init_bbq(test_cfg* cfg, test_queue_s* q) {
+ bbq_queue_s* ring;
size_t obj_size = sizeof(test_data);
if (cfg->ring.block_count == 0) {
@@ -29,35 +34,34 @@ int test_queue_init_bbq(test_cfg *cfg, test_queue_s *q) {
return BBQ_NULL_PTR;
}
- bbq_queue_s *bbq_ring = (bbq_queue_s *)q->ring;
+ bbq_queue_s* bbq_ring = (bbq_queue_s*)q->ring;
BBQ_DBG_LOG("block number:%lu size:%lu", bbq_ring->bn, bbq_ring->bs);
q->ring_free_f = (test_ring_free_f)bbq_ring_free;
q->enqueue_f = (test_ring_enqueue_f)bbq_enqueue;
q->dequeue_f = (test_ring_dequeue_f)bbq_dequeue;
- q->dequeue_burst_f = (test_dequeue_burst_f)bbq_dequeue_burst;
- q->enqueue_burst_f = (test_enqueue_burst_f)bbq_enqueue_burst;
-
+ q->dequeue_burst_f = (test_dequeue_burst_f)bbq_dequeue_burst_two_dimensional;
+ q->enqueue_burst_f = (test_enqueue_burst_f)test_bbq_enqueue_burst;
return 0;
}
-void test_queue_destory(test_queue_s *q) {
+void test_queue_destory(test_queue_s* q) {
if (q != NULL && q->ring_free_f != NULL) {
q->ring_free_f(q->ring);
}
}
-bool test_all_producer_exit(test_info_s *test_info) {
+bool test_all_producer_exit(test_info_s* test_info) {
return atomic_load(&test_info->ctl.producer_exit) == test_info->cfg.ring.producer_cnt;
}
-void test_wait_all_threads_ready(test_ctl *ctl) {
+void test_wait_all_threads_ready(test_ctl* ctl) {
pthread_barrier_wait(&ctl->all_threads_start);
BBQ_DBG_LOG("thread init done!");
}
-test_exit_data *test_exit_data_create(test_thread_arg_s *t_arg) {
- test_exit_data *exit_data = (test_exit_data *)test_malloc(TEST_MODULE_COMMON, sizeof(test_exit_data));
+test_exit_data* test_exit_data_create(test_thread_arg_s* t_arg) {
+ test_exit_data* exit_data = (test_exit_data*)test_malloc(TEST_MODULE_COMMON, sizeof(test_exit_data));
if (exit_data == NULL) {
BBQ_ERR_LOG("malloc failed");
exit(-1);
@@ -79,14 +83,14 @@ test_exit_data *test_exit_data_create(test_thread_arg_s *t_arg) {
return exit_data;
}
-test_exit_data *test_exit_data_destory(test_exit_data *data) {
+test_exit_data* test_exit_data_destory(test_exit_data* data) {
test_data_destory(data->simple_data, data->simple_data_cnt);
test_free(TEST_MODULE_COMMON, data->arg);
test_free(TEST_MODULE_COMMON, data);
}
-test_data **test_data_create(size_t cnt) {
- test_data **simple_data = test_malloc(TEST_MODULE_DATA, sizeof(*simple_data) * cnt);
+test_data** test_data_create(size_t cnt) {
+ test_data** simple_data = test_malloc(TEST_MODULE_DATA, sizeof(*simple_data) * cnt);
test_time_metric enqueue_time = test_clock_time_get();
for (size_t i = 0; i < cnt; i++) {
simple_data[i] = test_malloc(TEST_MODULE_DATA, sizeof(*simple_data[i]));
@@ -97,58 +101,42 @@ test_data **test_data_create(size_t cnt) {
return simple_data;
}
-void test_data_destory(test_data **data, size_t cnt) {
+void test_data_destory(test_data** data, size_t cnt) {
for (size_t i = 0; i < cnt; i++) {
test_free(TEST_MODULE_DATA, data[i]);
}
test_free(TEST_MODULE_DATA, data);
}
-uint32_t test_exec_enqueue(test_queue_s *q, test_data **data, size_t burst_cnt, test_time_metric *op_use_diff, uint16_t thread_idx) {
+uint32_t test_exec_enqueue(test_queue_s* q, test_data** data, size_t burst_cnt, test_time_metric* op_use_diff, uint16_t thread_idx) {
uint32_t enqueue_cnt = 0;
test_time_metric op_use_start = test_clock_time_get();
- if (burst_cnt == 1) {
- if (q->enqueue_f) {
- if (q->enqueue_f(q->ring, data[0]) == 0) {
- enqueue_cnt = 1;
- }
- }
- } else {
- enqueue_cnt = q->enqueue_burst_f(q->ring, data, burst_cnt, BBQ_F_ARRAY_2D, thread_idx);
- }
+ enqueue_cnt = q->enqueue_burst_f(q->ring, data, burst_cnt, thread_idx);
*op_use_diff = test_clock_time_sub(test_clock_time_get(), op_use_start);
return enqueue_cnt;
}
-uint32_t test_exec_dequeue(test_queue_s *q, test_data **data, size_t burst_cnt, test_time_metric *op_use_diff) {
+uint32_t test_exec_dequeue(test_queue_s* q, test_data** data, size_t burst_cnt, test_time_metric* op_use_diff) {
uint32_t dequeue_cnt = 0;
test_time_metric op_use_start = test_clock_time_get();
- if (burst_cnt == 1) {
- if (q->dequeue_f) {
- if (q->dequeue_f(q->ring, &data[0]) == 0) {
- dequeue_cnt = 1;
- }
- }
- } else {
- dequeue_cnt = q->dequeue_burst_f(q->ring, data, burst_cnt);
- }
+ dequeue_cnt = q->dequeue_burst_f(q->ring, (void**)data, burst_cnt);
*op_use_diff = test_clock_time_sub(test_clock_time_get(), op_use_start);
return dequeue_cnt;
}
-void *test_thread_producer_start(void *arg) {
+void* test_thread_producer_start(void* arg) {
int ret = 0;
uint32_t enqueue_cnt = 0;
uint64_t ok_cnt = 0;
uint64_t run_times = 0;
- test_thread_arg_s *t_arg = (test_thread_arg_s *)arg;
- test_info_s *test_info = t_arg->test_info;
- test_cfg *cfg = &test_info->cfg;
- test_queue_s *q = t_arg->q;
- test_exit_data *exit_data = test_exit_data_create(t_arg);
+ test_thread_arg_s* t_arg = (test_thread_arg_s*)arg;
+ test_info_s* test_info = t_arg->test_info;
+ test_cfg* cfg = &test_info->cfg;
+ test_queue_s* q = t_arg->q;
+ test_exit_data* exit_data = test_exit_data_create(t_arg);
char thread_name[128] = {0};
uint64_t op_ok_latency_ns = 0;
@@ -176,7 +164,7 @@ void *test_thread_producer_start(void *arg) {
enqueue_cnt = test_exec_enqueue(q, exit_data->simple_data, cfg->ring.burst_cnt, &op_latency, t_arg->thread_idx);
} else {
// 由于rmind不支持指定个数的批量出队列,为了兼容它,这里分配的空间位置entries大小。
- test_data **data = test_data_create(cfg->ring.entries_cnt);
+ test_data** data = test_data_create(cfg->ring.entries_cnt);
if (data == NULL) {
BBQ_ERR_LOG("malloc falied");
exit(-1);
@@ -211,22 +199,22 @@ void *test_thread_producer_start(void *arg) {
pthread_exit(exit_data);
}
-void *test_thread_consumer_start(void *arg) {
+void* test_thread_consumer_start(void* arg) {
uint32_t deq_cnt = -1;
uint64_t ok_cnt = 0;
uint64_t run_times = 0;
- test_thread_arg_s *t_arg = (test_thread_arg_s *)arg;
- test_info_s *test_info = t_arg->test_info;
- test_cfg *cfg = &test_info->cfg;
- test_queue_s *q = t_arg->q;
- test_exit_data *exit_data = test_exit_data_create(t_arg);
+ test_thread_arg_s* t_arg = (test_thread_arg_s*)arg;
+ test_info_s* test_info = t_arg->test_info;
+ test_cfg* cfg = &test_info->cfg;
+ test_queue_s* q = t_arg->q;
+ test_exit_data* exit_data = test_exit_data_create(t_arg);
uint64_t latency_ns = 0;
test_time_metric op_latency = {0};
uint64_t op_ok_latency_ns;
uint64_t op_err_latency_ns = 0;
uint64_t data_error_cnt = 0;
char thread_name[128] = {0};
- test_data **deq_data = test_malloc(TEST_MODULE_DATA, sizeof(*deq_data) * cfg->ring.entries_cnt);
+ test_data** deq_data = test_malloc(TEST_MODULE_DATA, sizeof(*deq_data) * cfg->ring.entries_cnt);
snprintf(thread_name, sizeof(thread_name), "consumer:%x", exit_data->thread_id);
prctl(PR_SET_NAME, thread_name);
@@ -251,7 +239,7 @@ void *test_thread_consumer_start(void *arg) {
deq_cnt = test_exec_dequeue(q, deq_data, cfg->ring.burst_cnt, &op_latency);
if (deq_cnt > 0) {
for (uint32_t i = 0; i < deq_cnt; i++) {
- test_data *data = deq_data[i];
+ test_data* data = deq_data[i];
if (cfg->ring.workload == TEST_WORKLOAD_SIMPLE) {
if (data->data != TEST_DATA_MAGIC) {
BBQ_ERR_LOG("the obtained data is not consistent with the expectation, expect:%u actual:%u", TEST_DATA_MAGIC, data->data);
@@ -295,22 +283,22 @@ void *test_thread_consumer_start(void *arg) {
pthread_exit(exit_data);
}
-void test_wait_all_threads_exit(test_info_s *test_info, uint32_t thread_cnt, pthread_t *threads, test_exit_data **exit_data) {
+void test_wait_all_threads_exit(test_info_s* test_info, uint32_t thread_cnt, pthread_t* threads, test_exit_data** exit_data) {
if (test_info->cfg.run.run_time > 0) {
- BBQ_DBG_LOG("sleep %lus, and notify all threads to exit...", cfg->run.run_time);
+ BBQ_DBG_LOG("sleep %lus, and notify all threads to exit...", test_info->cfg.run.run_time);
sleep(test_info->cfg.run.run_time);
test_info->ctl.running = false;
}
for (uint32_t i = 0; i < thread_cnt; i++) {
- pthread_join(threads[i], (void **)(&exit_data[i])); // 等待每个线程结束
+ pthread_join(threads[i], (void**)(&exit_data[i])); // 等待每个线程结束
}
}
-pthread_t *
-test_one_thread_create(test_info_s *test_info, test_queue_s *q, test_thread_type_e ttype, int core, uint16_t thread_id, pthread_t *thread) {
+pthread_t*
+test_one_thread_create(test_info_s* test_info, test_queue_s* q, test_thread_type_e ttype, int core, uint16_t thread_id, pthread_t* thread) {
BBQ_DBG_LOG("thread type:%d core:%d", ttype, core);
- test_thread_arg_s *arg = (test_thread_arg_s *)test_malloc(TEST_MODULE_COMMON, sizeof(test_thread_arg_s)); // 线程回收时free
+ test_thread_arg_s* arg = (test_thread_arg_s*)test_malloc(TEST_MODULE_COMMON, sizeof(test_thread_arg_s)); // 线程回收时free
arg->test_info = test_info;
arg->q = q;
arg->ttype = ttype;
@@ -329,14 +317,14 @@ test_one_thread_create(test_info_s *test_info, test_queue_s *q, test_thread_type
core_id = (core_id + 1) < max_id ? (core_id + 1) : core_id; \
} while (0)
-pthread_t *test_threads_create(test_info_s *test_info, test_queue_s *q) {
+pthread_t* test_threads_create(test_info_s* test_info, test_queue_s* q) {
// 创建生产者消费者线程
int ret;
uint16_t thread_id = 0;
int core_id = 0;
- test_cfg *cfg = &test_info->cfg;
+ test_cfg* cfg = &test_info->cfg;
size_t thread_cnt = cfg->ring.producer_cnt + cfg->ring.consumer_cnt;
- pthread_t *threads = (pthread_t *)test_malloc(TEST_MODULE_COMMON, sizeof(pthread_t) * thread_cnt); // 存储所有线程ID的数组
+ pthread_t* threads = (pthread_t*)test_malloc(TEST_MODULE_COMMON, sizeof(pthread_t) * thread_cnt); // 存储所有线程ID的数组
pthread_barrier_init(&test_info->ctl.all_threads_start, NULL, thread_cnt);
test_info->ctl.running = true;
@@ -365,8 +353,8 @@ pthread_t *test_threads_create(test_info_s *test_info, test_queue_s *q) {
}
} else {
// MPMC 或 只有生产者 或这有消费者,核心交错分配
- uint32_t pcnt = cfg->ring.producer_cnt; // 生产者个数
- uint32_t ccnt = cfg->ring.consumer_cnt; // 消费者个数
+ uint32_t pcnt = cfg->ring.producer_cnt; // 生产者个数
+ uint32_t ccnt = cfg->ring.consumer_cnt; // 消费者个数
for (core_id = 0; core_id < cfg->base.cores_cnt && pcnt > 0 && ccnt > 0;) {
if ((core_id & 1) == 0) {
// 偶数
@@ -397,12 +385,12 @@ pthread_t *test_threads_create(test_info_s *test_info, test_queue_s *q) {
return threads;
}
-void test_threads_destory(test_info_s *test_info, pthread_t *threads) {
+void test_threads_destory(test_info_s* test_info, pthread_t* threads) {
pthread_barrier_destroy(&test_info->ctl.all_threads_start);
test_free(TEST_MODULE_COMMON, threads);
}
-void test_merge_data_detail(test_merge_data *merge, test_exit_data *exit_data) {
+void test_merge_data_detail(test_merge_data* merge, test_exit_data* exit_data) {
merge->run_times += exit_data->run_times;
merge->ok_cnt += exit_data->ok_cnt;
merge->latency_ns += exit_data->latency_ns;
@@ -411,7 +399,7 @@ void test_merge_data_detail(test_merge_data *merge, test_exit_data *exit_data) {
merge->data_error_cnt += exit_data->data_error_cnt;
}
-void test_merge_all_data(test_exit_data **exit_data, uint32_t thread_cnt, test_merge_s *merge) {
+void test_merge_all_data(test_exit_data** exit_data, uint32_t thread_cnt, test_merge_s* merge) {
test_time_metric p_start = {0};
test_time_metric p_end = {0};
test_time_metric c_start = {0};
diff --git a/bbq/tests/common/test_queue.h b/bbq/tests/common/test_queue.h
index ed2800a..22bce92 100644
--- a/bbq/tests/common/test_queue.h
+++ b/bbq/tests/common/test_queue.h
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-11 11:31:10
+ * @LastEditTime: 2024-06-13 18:59:37
* @Describe: TODO
*/
@@ -15,8 +15,8 @@
typedef void (*test_ring_free_f)(void *ring);
typedef int (*test_ring_enqueue_f)(void *ring, void *obj);
typedef int (*test_ring_dequeue_f)(void *ring, void *obj);
-typedef uint32_t (*test_enqueue_burst_f)(void *ring, void *obj_table, size_t n, unsigned int flags, uint16_t thread_idx);
-typedef uint32_t (*test_dequeue_burst_f)(void *ring, void *obj_table, uint32_t n);
+typedef uint32_t (*test_enqueue_burst_f)(void *ring, void *obj_table, uint32_t n, uint16_t thread_idx);
+typedef uint32_t (*test_dequeue_burst_f)(void *ring, void **obj_table, uint32_t n);
typedef bool (*test_ring_empty_f)(void *ring);
typedef struct {
diff --git a/bbq/tests/unittest/ut_data.cc b/bbq/tests/unittest/ut_data.cc
index 81a17da..74ed0e2 100644
--- a/bbq/tests/unittest/ut_data.cc
+++ b/bbq/tests/unittest/ut_data.cc
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-11 11:31:52
+ * @LastEditTime: 2024-06-13 23:25:39
* @Describe: TODO
*/
@@ -13,6 +13,7 @@ extern "C" {
extern bool bbq_malloc_free_equal();
extern bool test_malloc_free_equal();
extern void bbq_memory_info();
+bool bbq_check_array_bounds(bbq_queue_s* q);
}
TEST(data, correct) {
@@ -25,12 +26,12 @@ TEST(data, correct) {
},
.ring = {
.ring_type = TEST_RING_TYPE_BBQ,
- .producer_cnt = 5,
- .consumer_cnt = 5,
+ .producer_cnt = 1,
+ .consumer_cnt = 1,
.workload = TEST_WORKLOAD_SIMPLE,
.entries_cnt = 4096,
.block_count = 0,
- .burst_cnt = 1,
+ .burst_cnt = 4,
},
.run = {
.run_ok_times = 50000,
@@ -46,12 +47,12 @@ TEST(data, correct) {
ASSERT_TRUE(ret == 0);
// 创建线程
- pthread_t *threads = test_threads_create(&test_info, &q);
+ pthread_t* threads = test_threads_create(&test_info, &q);
ASSERT_TRUE(threads);
// 等待所有线程完成,回收数据
uint32_t thread_cnt = test_info.cfg.ring.producer_cnt + test_info.cfg.ring.consumer_cnt;
- test_exit_data **exit_data = (test_exit_data **)test_malloc(TEST_MODULE_UTEST, sizeof(test_exit_data **) * (thread_cnt));
+ test_exit_data** exit_data = (test_exit_data**)test_malloc(TEST_MODULE_UTEST, sizeof(test_exit_data**) * (thread_cnt));
test_wait_all_threads_exit(&test_info, thread_cnt, threads, exit_data);
// 比较数据
@@ -66,7 +67,8 @@ TEST(data, correct) {
}
test_free(TEST_MODULE_UTEST, exit_data);
test_threads_destory(&test_info, threads);
+ EXPECT_TRUE(bbq_check_array_bounds((bbq_queue_s*)q.ring));
test_queue_destory(&q);
EXPECT_TRUE(bbq_malloc_free_equal());
EXPECT_TRUE(test_malloc_free_equal());
-} \ No newline at end of file
+}
diff --git a/bbq/tests/unittest/ut_example.cc b/bbq/tests/unittest/ut_example.cc
index 049d41f..943aeb8 100644
--- a/bbq/tests/unittest/ut_example.cc
+++ b/bbq/tests/unittest/ut_example.cc
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-05 17:21:25
+ * @LastEditTime: 2024-06-13 23:28:23
* @Describe: TODO
*/
@@ -18,77 +18,7 @@ extern "C" {
#include "ut.h"
extern bool bbq_malloc_free_equal();
extern void bbq_memory_info();
-}
-
-TEST(burst, table_cp_value) {
- test_memory_counter_clear();
-
- uint32_t ret1 = 0;
- uint32_t ret2 = 0;
- uint32_t ret3 = 0;
- bbq_queue_s *q;
- uint32_t buf_cnt = 4096;
- uint32_t cnt1 = 2048;
- uint32_t cnt2 = 2048;
- uint32_t cnt3 = 128;
-
- // 创建测试数据
- uint64_t enq_table1[cnt1];
- uint64_t enq_table2[cnt2];
- uint64_t enq_table3[cnt3];
- memset(enq_table1, 1, sizeof(enq_table1));
- memset(enq_table2, 1, sizeof(enq_table2));
- memset(enq_table3, 1, sizeof(enq_table3));
-
- uint64_t deq_table1[cnt1];
- uint64_t deq_table2[cnt2];
- uint64_t deq_table3[cnt3];
-
- // 创建队列
- q = bbq_ring_create(buf_cnt, sizeof(uint64_t), BBQ_F_COPY_VALUE | BBQ_F_POLICY_RETRY_NEW);
- EXPECT_TRUE(q);
-
- // 批量入队
- ret1 = bbq_enqueue_burst(q, (void *)enq_table1, cnt1, BBQ_F_ARRAY_1D);
- EXPECT_EQ(ret1, cnt1);
- ret3 = bbq_enqueue_burst(q, (void *)enq_table3, cnt3, BBQ_F_ARRAY_1D);
- EXPECT_EQ(ret3, cnt3);
- // 队列已满,实际入队<cnt2
- ret2 = bbq_enqueue_burst(q, (void *)enq_table2, cnt2, BBQ_F_ARRAY_1D);
- EXPECT_EQ(ret2, cnt2 - cnt3);
- EXPECT_EQ(buf_cnt, ret1 + ret2 + ret3);
-
- ret3 = bbq_dequeue_burst(q, (void *)deq_table3, cnt3);
- EXPECT_EQ(ret3, cnt3);
- ret2 = bbq_dequeue_burst(q, (void *)deq_table2, cnt2);
- EXPECT_EQ(ret2, cnt2);
- // 队列已空,实际入队<cnt1
- ret1 = bbq_dequeue_burst(q, (void *)deq_table1, cnt1);
- EXPECT_EQ(ret1, cnt1 - cnt3);
- EXPECT_EQ(buf_cnt, ret1 + ret2 + ret3);
-
- // 验证数据
- uint64_t tmp = 0;
- memset(&tmp, 1, sizeof(tmp));
-
- for (uint32_t i = 0; i < ret1; i++) {
- EXPECT_TRUE(deq_table1[i]);
- EXPECT_EQ(deq_table1[i], tmp);
- }
- for (uint32_t i = 0; i < ret2; i++) {
- EXPECT_TRUE(deq_table2[i]);
- EXPECT_EQ(deq_table2[i], tmp);
- }
- for (uint32_t i = 0; i < ret3; i++) {
- EXPECT_TRUE(deq_table3[i]);
- EXPECT_EQ(deq_table3[i], tmp);
- }
-
- bbq_ring_free(q);
-
- // 内存泄漏检测
- EXPECT_TRUE(bbq_malloc_free_equal());
- EXPECT_TRUE(test_malloc_free_equal());
+extern bool bbq_check_array_bounds(bbq_queue_s* q);
}
#define BUF_CNT 4096
@@ -96,18 +26,18 @@ TEST(burst, table_cp_value) {
#define ENQ_TABLE2_CNT 90
#define ENQ_TABLE3_CNT 10
-TEST(burst, ptr_table_cp_value) {
+TEST(burst, retry_new_cp_value) {
test_memory_counter_clear();
uint32_t ret1 = 0;
uint32_t ret2 = 0;
uint32_t ret3 = 0;
- bbq_queue_s *q;
+ bbq_queue_s* q;
// 创建测试数据,3种数据类型,均将数据值入队列
- uint16_t **enq_table1 = test_enqueue_table_create(ENQ_TABLE1_CNT);
- uint16_t *enq_table2[ENQ_TABLE2_CNT] = {0};
+ uint16_t** enq_table1 = test_enqueue_table_create(ENQ_TABLE1_CNT);
+ uint16_t* enq_table2[ENQ_TABLE2_CNT] = {0};
for (int i = 0; i < ENQ_TABLE2_CNT; i++) {
- enq_table2[i] = (uint16_t *)test_malloc(TEST_MODULE_DATA, sizeof(uint16_t));
+ enq_table2[i] = (uint16_t*)test_malloc(TEST_MODULE_DATA, sizeof(uint16_t));
*enq_table2[i] = TEST_TABLE_DATA_MAGIC;
}
uint16_t enq_table3[ENQ_TABLE3_CNT];
@@ -116,44 +46,42 @@ TEST(burst, ptr_table_cp_value) {
}
uint16_t deq_table1[ENQ_TABLE1_CNT] = {0};
- uint16_t *deq_table2 = (uint16_t *)test_malloc(TEST_MODULE_DATA, sizeof(uint16_t) * BUF_CNT);
+ uint16_t* deq_table2 = (uint16_t*)test_malloc(TEST_MODULE_DATA, sizeof(uint16_t) * BUF_CNT);
// 创建队列
q = bbq_ring_create(BUF_CNT, sizeof(uint16_t), BBQ_F_POLICY_RETRY_NEW | BBQ_F_COPY_VALUE);
EXPECT_TRUE(q);
// 批量入队(全成功)
- ret1 = bbq_enqueue_burst(q, (void *)enq_table1, ENQ_TABLE1_CNT, BBQ_F_ARRAY_2D);
+ ret1 = bbq_enqueue_burst_two_dimensional(q, (void**)enq_table1, ENQ_TABLE1_CNT);
EXPECT_EQ(ret1, ENQ_TABLE1_CNT);
// 批量入队(全成功)
- ret2 = bbq_enqueue_burst(q, (void *)enq_table2, ENQ_TABLE2_CNT, BBQ_F_ARRAY_2D);
+ ret2 = bbq_enqueue_burst_two_dimensional(q, (void**)enq_table2, ENQ_TABLE2_CNT);
EXPECT_EQ(ret2, ENQ_TABLE2_CNT);
// 批量入队(部分成功)
- ret3 = bbq_enqueue_burst(q, (void *)enq_table3, ENQ_TABLE3_CNT, BBQ_F_ARRAY_1D);
+ ret3 = bbq_enqueue_burst_one_dimensional(q, (void*)enq_table3, ENQ_TABLE3_CNT);
EXPECT_EQ(ret3, BUF_CNT - ret1 - ret2);
/*------------------------------------------------------------------*/
// 出队列(全成功)
- ret1 = bbq_dequeue_burst(q, (void *)deq_table1, ENQ_TABLE1_CNT);
+ ret1 = bbq_dequeue_burst_one_dimensional(q, (void*)deq_table1, ENQ_TABLE1_CNT);
EXPECT_EQ(ret1, ENQ_TABLE1_CNT);
// 出队列(部分成功)
- ret2 = bbq_dequeue_burst(q, (void *)deq_table2, BUF_CNT);
+ ret2 = bbq_dequeue_burst_one_dimensional(q, (void*)deq_table2, BUF_CNT);
EXPECT_EQ(ret2, BUF_CNT - ret1);
// 验证数据
for (uint32_t i = 0; i < ret1; i++) {
- EXPECT_TRUE(deq_table1[i]);
EXPECT_EQ(deq_table1[i], TEST_TABLE_DATA_MAGIC) << "i :" << i;
}
for (uint32_t i = 0; i < ret2; i++) {
- EXPECT_TRUE(deq_table2[i]);
EXPECT_EQ(deq_table2[i], TEST_TABLE_DATA_MAGIC) << "i :" << i;
}
-
+ EXPECT_TRUE(bbq_check_array_bounds(q));
bbq_ring_free(q);
// 释放测试数据
@@ -167,18 +95,18 @@ TEST(burst, ptr_table_cp_value) {
EXPECT_TRUE(test_malloc_free_equal());
}
-TEST(burst, ptr_table_cp_pointer) {
+TEST(burst, retry_new_cp_pointer) {
test_memory_counter_clear();
uint32_t ret1 = 0;
uint32_t ret2 = 0;
uint32_t ret3 = 0;
- bbq_queue_s *q;
+ bbq_queue_s* q;
// 创建测试数据,3种数据类型,均将数据值入队列
- uint16_t **enq_table1 = test_enqueue_table_create(ENQ_TABLE1_CNT);
- uint16_t *enq_table2[ENQ_TABLE2_CNT] = {0};
+ uint16_t** enq_table1 = test_enqueue_table_create(ENQ_TABLE1_CNT);
+ uint16_t* enq_table2[ENQ_TABLE2_CNT] = {0};
for (int i = 0; i < ENQ_TABLE2_CNT; i++) {
- enq_table2[i] = (uint16_t *)test_malloc(TEST_MODULE_DATA, sizeof(uint16_t));
+ enq_table2[i] = (uint16_t*)test_malloc(TEST_MODULE_DATA, sizeof(uint16_t));
*enq_table2[i] = TEST_TABLE_DATA_MAGIC;
}
uint16_t enq_table3[ENQ_TABLE3_CNT];
@@ -186,20 +114,32 @@ TEST(burst, ptr_table_cp_pointer) {
enq_table3[i] = TEST_TABLE_DATA_MAGIC;
}
- uint16_t *deq_table1[BUF_CNT] = {0};
- uint16_t *deq_table2[BUF_CNT] = {0};
- uint16_t *deq_table3[BUF_CNT] = {0};
+ uint16_t* deq_table1[BUF_CNT] = {0};
+ uint16_t* deq_table2[BUF_CNT] = {0};
+ uint16_t** deq_table3 = (uint16_t**)malloc(sizeof(uint16_t*) * BUF_CNT);
// 创建队列
q = bbq_ring_create(BUF_CNT, sizeof(uint16_t), BBQ_F_POLICY_RETRY_NEW | BBQ_F_COPY_POINTER);
EXPECT_TRUE(q);
- ret3 = bbq_enqueue_burst(q, (void *)enq_table3, ENQ_TABLE3_CNT, BBQ_F_ARRAY_1D);
- EXPECT_EQ(ret3, ENQ_TABLE3_CNT);
+ ret1 = bbq_enqueue_burst_two_dimensional(q, (void**)enq_table1, ENQ_TABLE1_CNT);
+ EXPECT_EQ(ret1, ENQ_TABLE1_CNT);
+
+ ret2 = bbq_enqueue_burst_two_dimensional(q, (void**)enq_table2, ENQ_TABLE2_CNT);
+ EXPECT_EQ(ret2, ENQ_TABLE2_CNT);
+
+ ret3 = bbq_enqueue_burst_one_dimensional(q, (void*)enq_table3, ENQ_TABLE3_CNT);
+ EXPECT_EQ(ret3, BUF_CNT - ret1 - ret2);
/*------------------------------------------------------------------*/
- ret3 = bbq_dequeue_burst(q, (void *)deq_table3, ENQ_TABLE3_CNT);
- EXPECT_EQ(ret3, ENQ_TABLE3_CNT);
+ ret1 = bbq_dequeue_burst_two_dimensional(q, (void**)deq_table1, ENQ_TABLE1_CNT);
+ EXPECT_EQ(ret1, ENQ_TABLE1_CNT);
+
+ ret2 = bbq_dequeue_burst_two_dimensional(q, (void**)deq_table2, ENQ_TABLE2_CNT);
+ EXPECT_EQ(ret2, ENQ_TABLE2_CNT);
+
+ ret3 = bbq_dequeue_burst_two_dimensional(q, (void**)deq_table3, ENQ_TABLE3_CNT);
+ EXPECT_EQ(ret3, BUF_CNT - ret1 - ret2);
// 验证数据
for (uint32_t i = 0; i < ret1; i++) {
@@ -216,7 +156,7 @@ TEST(burst, ptr_table_cp_pointer) {
EXPECT_TRUE(deq_table3[i]);
EXPECT_EQ(*deq_table3[i], TEST_TABLE_DATA_MAGIC) << "i :" << i;
}
-
+ EXPECT_TRUE(bbq_check_array_bounds(q));
bbq_ring_free(q);
// 释放测试数据
diff --git a/bbq/tests/unittest/ut_head_cursor.cc b/bbq/tests/unittest/ut_head_cursor.cc
index 8fe5dae..6f6a42c 100644
--- a/bbq/tests/unittest/ut_head_cursor.cc
+++ b/bbq/tests/unittest/ut_head_cursor.cc
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-11 11:32:48
+ * @LastEditTime: 2024-06-13 23:26:11
* @Describe: TODO
*/
@@ -10,34 +10,35 @@ extern "C" {
#include "ut.h"
extern bool bbq_malloc_free_equal();
extern void bbq_memory_info();
+bool bbq_check_array_bounds(bbq_queue_s* q);
}
-void expect_phead(bbq_queue_s *q, uint64_t idx, uint64_t vsn, int line) {
+void expect_phead(bbq_queue_s* q, uint64_t idx, uint64_t vsn, int line) {
EXPECT_EQ(bbq_idx(q, q->phead), idx) << "line: " << line;
EXPECT_EQ(bbq_head_vsn(q, q->phead), vsn) << "line: " << line;
}
-void expect_chead(bbq_queue_s *q, uint64_t idx, uint64_t vsn, int line) {
+void expect_chead(bbq_queue_s* q, uint64_t idx, uint64_t vsn, int line) {
EXPECT_EQ(bbq_idx(q, q->chead), idx) << "line: " << line;
EXPECT_EQ(bbq_head_vsn(q, q->chead), vsn) << "line: " << line;
}
-void expect_eq_allocated(bbq_queue_s *q, bbq_block_s *block, uint64_t off, uint64_t vsn, int line) {
+void expect_eq_allocated(bbq_queue_s* q, bbq_block_s* block, uint64_t off, uint64_t vsn, int line) {
EXPECT_EQ(bbq_off(q, block->allocated), off) << "line: " << line;
EXPECT_EQ(bbq_cur_vsn(q, block->allocated), vsn) << "line: " << line;
}
-void expect_eq_committed(bbq_queue_s *q, bbq_block_s *block, uint64_t off, uint64_t vsn, int line) {
+void expect_eq_committed(bbq_queue_s* q, bbq_block_s* block, uint64_t off, uint64_t vsn, int line) {
EXPECT_EQ(bbq_off(q, block->committed), off) << "line: " << line;
EXPECT_EQ(bbq_cur_vsn(q, block->committed), vsn) << "line: " << line;
}
-void expect_eq_consumed(bbq_queue_s *q, bbq_block_s *block, uint64_t off, uint64_t vsn, int line) {
+void expect_eq_consumed(bbq_queue_s* q, bbq_block_s* block, uint64_t off, uint64_t vsn, int line) {
EXPECT_EQ(bbq_off(q, block->consumed), off) << "line: " << line;
EXPECT_EQ(bbq_cur_vsn(q, block->consumed), vsn) << "line: " << line;
}
-void expect_eq_reserved(bbq_queue_s *q, bbq_block_s *block, uint64_t off, uint64_t vsn, int line) {
+void expect_eq_reserved(bbq_queue_s* q, bbq_block_s* block, uint64_t off, uint64_t vsn, int line) {
EXPECT_EQ(bbq_off(q, block->reserved), off) << "line: " << line;
EXPECT_EQ(bbq_cur_vsn(q, block->reserved), vsn) << "line: " << line;
}
@@ -47,7 +48,7 @@ TEST(head_cursor, init) {
test_memory_counter_clear();
int ret = 0;
- bbq_queue_s *q;
+ bbq_queue_s* q;
uint32_t bn = 2;
uint32_t bs = 4;
int enqueue_data = TEST_DATA_MAGIC;
@@ -69,7 +70,7 @@ TEST(head_cursor, init) {
expect_eq_reserved(q, &q->blocks[i], bs, 0, __LINE__);
expect_eq_consumed(q, &q->blocks[i], bs, 0, __LINE__);
}
-
+ EXPECT_TRUE(bbq_check_array_bounds(q));
bbq_ring_free(q);
EXPECT_TRUE(bbq_malloc_free_equal());
EXPECT_TRUE(test_malloc_free_equal());
@@ -77,7 +78,7 @@ TEST(head_cursor, init) {
void ut_produce_something(uint32_t produce_cnt) {
int ret = 0;
- bbq_queue_s *q;
+ bbq_queue_s* q;
uint32_t bn = 8;
uint32_t bs = 4096;
int enqueue_data = TEST_DATA_MAGIC;
@@ -122,7 +123,7 @@ void ut_produce_something(uint32_t produce_cnt) {
expect_eq_reserved(q, &q->blocks[i], bs, 0, __LINE__);
expect_eq_consumed(q, &q->blocks[i], bs, 0, __LINE__);
}
-
+ EXPECT_TRUE(bbq_check_array_bounds(q));
bbq_ring_free(q);
}
// 在第一块内生产,然后被消费完
@@ -139,7 +140,7 @@ TEST(head_cursor, produce_something) {
void ut_produce_next_block(uint32_t over) {
int ret = 0;
- bbq_queue_s *q;
+ bbq_queue_s* q;
uint32_t bn = 8;
uint32_t bs = 4096;
uint32_t produce_cnt = bs + over;
@@ -189,6 +190,7 @@ void ut_produce_next_block(uint32_t over) {
expect_eq_reserved(q, &q->blocks[1], over, 1, __LINE__);
expect_eq_consumed(q, &q->blocks[1], over, 1, __LINE__);
+ EXPECT_TRUE(bbq_check_array_bounds(q));
bbq_ring_free(q);
}
@@ -206,7 +208,7 @@ TEST(head_cursor, produce_next_block) {
void ut_produce_all_loop(uint32_t loop) {
int ret = 0;
- bbq_queue_s *q;
+ bbq_queue_s* q;
uint32_t bn = 8;
uint32_t bs = 4096;
uint32_t produce_cnt = bn * bs;
@@ -245,6 +247,9 @@ void ut_produce_all_loop(uint32_t loop) {
expect_eq_reserved(q, &q->blocks[i], bs, loop, __LINE__);
expect_eq_consumed(q, &q->blocks[i], bs, loop, __LINE__);
}
+
+ EXPECT_TRUE(bbq_check_array_bounds(q));
+ bbq_ring_free(q);
}
// 完成多轮的满生产和满消费
@@ -264,15 +269,17 @@ TEST(boundary, retry_new_full_empty) {
int ret = 0;
uint32_t entries_cnt = 4096;
uint32_t loop = 1000;
- bbq_queue_s *q;
+ bbq_queue_s* q;
- int *data = (int *)test_malloc(TEST_MODULE_UTEST, sizeof(*data) * entries_cnt);
+ int* data = (int*)test_malloc(TEST_MODULE_UTEST, sizeof(*data) * entries_cnt);
int tmp_data = 0;
EXPECT_TRUE(data);
q = bbq_ring_create(entries_cnt, sizeof(int), BBQ_F_COPY_VALUE | BBQ_F_POLICY_RETRY_NEW);
EXPECT_TRUE(q);
+ bool test = bbq_check_array_bounds(q);
+
for (uint32_t i = 0; i < loop; i++) {
// 入满队
for (uint32_t j = 0; j < entries_cnt; j++) {
@@ -320,6 +327,7 @@ TEST(boundary, retry_new_full_empty) {
}
test_free(TEST_MODULE_UTEST, data);
+ EXPECT_TRUE(bbq_check_array_bounds(q));
bbq_ring_free(q);
EXPECT_TRUE(bbq_malloc_free_equal());
EXPECT_TRUE(test_malloc_free_equal());
@@ -356,12 +364,12 @@ TEST(boundary, mpsc_faa) {
ASSERT_TRUE(ret == 0);
// 创建线程
- pthread_t *threads = test_threads_create(&test_info, &q);
+ pthread_t* threads = test_threads_create(&test_info, &q);
ASSERT_TRUE(threads);
// 等待所有线程完成,回收数据
uint32_t thread_cnt = test_info.cfg.ring.producer_cnt + test_info.cfg.ring.consumer_cnt;
- test_exit_data **exit_data = (test_exit_data **)test_malloc(TEST_MODULE_UTEST, sizeof(test_exit_data **) * (thread_cnt));
+ test_exit_data** exit_data = (test_exit_data**)test_malloc(TEST_MODULE_UTEST, sizeof(test_exit_data**) * (thread_cnt));
uint32_t i = 0;
test_wait_all_threads_exit(&test_info, thread_cnt, threads, exit_data);
@@ -378,17 +386,18 @@ TEST(boundary, mpsc_faa) {
}
test_free(TEST_MODULE_UTEST, exit_data);
test_threads_destory(&test_info, threads);
+ EXPECT_TRUE(bbq_check_array_bounds((bbq_queue_s*)q.ring));
test_queue_destory(&q);
EXPECT_TRUE(bbq_malloc_free_equal());
EXPECT_TRUE(test_malloc_free_equal());
}
-void debug_head_print(bbq_queue_s *q) {
+void debug_head_print(bbq_queue_s* q) {
printf("phead vsn:%d idx:%d\n", bbq_head_vsn(q, q->phead), bbq_idx(q, q->phead));
printf("chead vsn:%d idx:%d\n", bbq_head_vsn(q, q->chead), bbq_idx(q, q->chead));
}
-void debug_block_print(bbq_queue_s *q) {
+void debug_block_print(bbq_queue_s* q) {
for (int i = 0; i < q->bn; i++) {
printf("[%d]zzzz allocated:vsn:%d off:%d\n", i, bbq_cur_vsn(q, q->blocks[i].allocated.load()), bbq_off(q, q->blocks[i].allocated.load()));
printf("[%d]zzzz committed:%d off:%d\n", i, bbq_cur_vsn(q, q->blocks[i].committed.load()), bbq_off(q, q->blocks[i].committed.load()));
@@ -404,7 +413,7 @@ TEST(boundary, drop_old_full_empty1) {
uint32_t bs = 4;
uint32_t over_cnt = 3;
uint32_t loop = 1000;
- bbq_queue_s *q;
+ bbq_queue_s* q;
int tmp_data = 0;
q = bbq_ring_create_bnbs(bn, bs, sizeof(int), BBQ_F_COPY_VALUE | BBQ_F_POLICY_DROP_OLD);
@@ -439,7 +448,7 @@ TEST(boundary, drop_old_full_empty1) {
EXPECT_EQ(q->blocks[i].consumed.load(), 0);
}
}
-
+ EXPECT_TRUE(bbq_check_array_bounds(q));
bbq_ring_free(q);
EXPECT_TRUE(bbq_malloc_free_equal());
@@ -453,7 +462,7 @@ TEST(boundary, drop_old_full_empty2) {
uint32_t bs = 4;
uint32_t loop = 1000;
uint32_t over_cnt = bs + 2;
- bbq_queue_s *q;
+ bbq_queue_s* q;
EXPECT_EQ(over_cnt / bs, 1);
@@ -511,7 +520,7 @@ TEST(boundary, drop_old_full_empty2) {
i == 1 ? loop + 1 : 0, __LINE__);
EXPECT_EQ(q->blocks[i].consumed.load(), 0);
}
-
+ EXPECT_TRUE(bbq_check_array_bounds(q));
bbq_ring_free(q);
EXPECT_TRUE(bbq_malloc_free_equal());
EXPECT_TRUE(test_malloc_free_equal());
diff --git a/bbq/tests/unittest/ut_mix.cc b/bbq/tests/unittest/ut_mix.cc
index e2b53e8..b279234 100644
--- a/bbq/tests/unittest/ut_mix.cc
+++ b/bbq/tests/unittest/ut_mix.cc
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-06 23:54:27
+ * @LastEditTime: 2024-06-13 23:13:00
* @Describe: bbq除了队列操作外,其他函数的测试
*/
diff --git a/perf/benchmark/bcm_benchmark.c b/perf/benchmark/bcm_benchmark.c
index 7298bb7..6cbbdfa 100644
--- a/perf/benchmark/bcm_benchmark.c
+++ b/perf/benchmark/bcm_benchmark.c
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-11 11:32:43
+ * @LastEditTime: 2024-06-13 16:32:40
* @Describe: TODO
*/
diff --git a/perf/benchmark/bcm_queue.c b/perf/benchmark/bcm_queue.c
index 98b0d29..9eec4f2 100644
--- a/perf/benchmark/bcm_queue.c
+++ b/perf/benchmark/bcm_queue.c
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-06 17:36:40
+ * @LastEditTime: 2024-06-13 18:58:20
* @Describe: TODO
*/
@@ -8,12 +8,12 @@
#include "ringbuf.h"
static __rte_always_inline unsigned int
-bcm_ring_enqueue_burst(struct rte_ring *r, void *obj_table, size_t n, unsigned int flags) {
+bcm_dpdk_ring_enqueue_burst(struct rte_ring *r, void **obj_table, size_t n, uint16_t thread_idx) {
return rte_ring_enqueue_burst(r, (void *const *)obj_table, n, NULL);
}
static __rte_always_inline unsigned int
-bcm_ring_dequeue_burst(struct rte_ring *r, void *obj_table, unsigned int n) {
+bcm_dpdk_ring_dequeue_burst(struct rte_ring *r, void *obj_table, unsigned int n) {
return rte_ring_dequeue_burst(r, (void **)obj_table, n, NULL);
}
@@ -47,8 +47,8 @@ int test_queue_init_dpdk(test_cfg *cfg, test_queue_s *q) {
q->ring_free_f = (test_ring_free_f)rte_ring_free;
q->enqueue_f = (test_ring_enqueue_f)rte_ring_enqueue;
q->dequeue_f = (test_ring_dequeue_f)rte_ring_dequeue;
- q->enqueue_burst_f = (test_enqueue_burst_f)bcm_ring_enqueue_burst;
- q->dequeue_burst_f = (test_dequeue_burst_f)bcm_ring_dequeue_burst;
+ q->enqueue_burst_f = (test_enqueue_burst_f)bcm_dpdk_ring_enqueue_burst;
+ q->dequeue_burst_f = (test_dequeue_burst_f)bcm_dpdk_ring_dequeue_burst;
return BBQ_OK;
}
@@ -67,7 +67,7 @@ void test_queue_free_rmind(void *ring) {
test_free(TEST_MODULE_RMIND, ring);
}
-uint32_t test_enqueue_burst_rmind(void *ring, void *obj_table, size_t n, unsigned int flags, uint16_t thread_idx) {
+uint32_t test_enqueue_burst_rmind(void *ring, void **obj_table, size_t n, unsigned int flags, uint16_t thread_idx) {
uint32_t cnt = 0;
int ret = 0;
size_t off = 0;
@@ -76,7 +76,7 @@ uint32_t test_enqueue_burst_rmind(void *ring, void *obj_table, size_t n, unsigne
size_t len = sizeof(uintptr_t);
for (cnt = 0; cnt < n; cnt++) {
- obj = ((void **)obj_table)[cnt];
+ obj = obj_table[cnt];
uintptr_t uptr = (uintptr_t)obj;
if ((ret = ringbuf_acquire(ring, w, len)) != -1) {
diff --git a/perf/benchmark/benchmark.sh b/perf/benchmark/benchmark.sh
index 4151dfb..272b6d6 100755
--- a/perf/benchmark/benchmark.sh
+++ b/perf/benchmark/benchmark.sh
@@ -1,18 +1,17 @@
#!/bin/bash
###
# @Author: liuyu
-# @LastEditTime: 2024-06-06 22:04:41
+# @LastEditTime: 2024-06-14 10:19:30
# @Describe: TODO
###
ring_type_arr=("bbq" "dpdk" "rmind")
+burst_arr=("32" "16" "8" "1")
# 检查参数数量
-if [ "$#" -ne 4 ]; then
- echo "Usage: $0 <path_to_benchmark> <path_to_config_directory> <ring_type> <burst_cnt>"
- echo " <ring_type> is one of: ${my_array[*]} or input all" # 列出可选的 ring_type
- echo " <burst_cnt> : The number of obj enqueue or dequeue at a time" # 列出可选的 ring_type
+if [ "$#" -ne 3 ]; then
+ echo "Usage: $0 <path_to_benchmark> <path_to_config_directory> <ring_type>"
exit 1
fi
@@ -20,16 +19,11 @@ fi
BENCHMARK_PATH=$1
CONFIG_DIR=$2
RING_TYPE=$3
-BURST_CNT=$4
-
-if [ "$BURST_CNT" -le 0 ]; then
- echo "burst_cnt need >=1"
- exit 1
-fi
function exec_benchmark_ring_type() {
local ini="$1"
local ring="$2"
+ local log_file=$3
# 如果以perf开头的配置文件,还要执行perf统计
if [[ $(basename "$ini") == perf* ]]; then
@@ -37,13 +31,15 @@ function exec_benchmark_ring_type() {
# echo perf stat -a -e L1-dcache-loads,L1-dcache-load-misses,cache-references,cache-misses "$BENCHMARK_PATH" "$ini" "$ring" "$BURST_CNT"
# perf stat -a -e L1-dcache-loads,L1-dcache-load-misses,cache-references,cache-misses "$BENCHMARK_PATH" "$ini" "$ring" "$BURST_CNT"
else
- "$BENCHMARK_PATH" "$ini" "$ring" "$BURST_CNT"
+ "$BENCHMARK_PATH" "$ini" "$ring" "$BURST_CNT" 2>&1 | tee -a "$log_file"
fi
}
function exec_benchmark() {
# 提取配置文件名(不带路径)
- INI_FILE=$1
+ local INI_FILE=$1
+ local log_file=$2
+ local burst=$3
# 执行benchmark命令并传递配置文件作为参数
echo "Executing benchmark with $INI_FILE"
@@ -51,11 +47,11 @@ function exec_benchmark() {
# 使用所有ring_type
if [ "$RING_TYPE" == "all" ]; then
for ring_type_tmp in "${ring_type_arr[@]}"; do
- exec_benchmark_ring_type "$INI_FILE" "$ring_type_tmp"
- echo "--------------------------------------------------------------"
+ echo start ring:"$ring_type_tmp"
+ exec_benchmark_ring_type "$INI_FILE" "$ring_type_tmp" "$log_file" "$burst"
done
else
- exec_benchmark_ring_type "$INI_FILE" "$RING_TYPE"
+ exec_benchmark_ring_type "$INI_FILE" "$RING_TYPE" "$log_file" "$burst"
fi
}
@@ -71,16 +67,31 @@ if [ ! -e "$CONFIG_DIR" ]; then
exit 1
fi
-if [[ -f "$CONFIG_DIR" ]]; then
- # 如果是文件,直接执行
- exec_benchmark $CONFIG_DIR
-else
- # 使用 find 命令递归地搜索所有的 .ini 文件,并按文件名排序
- find "$CONFIG_DIR" -type f -name "*.ini" -print0 | sort -z | while IFS= read -r -d '' INI_FILE; do
- if [ -f "$INI_FILE" ]; then
- exec_benchmark $INI_FILE
+# 创建报告目录
+timestamp=$(date +"%Y%m%d_%H%M%S")
+folder_path="/tmp/bbq/$timestamp"
+rm -rf "$folder_path"
+mkdir -p "$folder_path"
+
+for burst in "${burst_arr[@]}"; do
+ burst_dir="$folder_path"/burst_"$burst"
+ mkdir -p "$burst_dir"
+ for i in {1..3}; do
+ echo ======"$i"========
+ report_path="$burst_dir"/report_"$i".txt
+ if [[ -f "$CONFIG_DIR" ]]; then
+ # 如果是文件,直接执行
+ exec_benchmark "$CONFIG_DIR" "$report_path" "$burst"
+ else
+ # 使用 find 命令递归地搜索所有的 .ini 文件,并按文件名排序
+ find "$CONFIG_DIR" -type f -name "*.ini" -print0 | sort -z | while IFS= read -r -d '' INI_FILE; do
+ if [ -f "$INI_FILE" ]; then
+ exec_benchmark "$INI_FILE" "$report_path" "$burst"
+ fi
+ done
fi
- done
-fi
+ sleep 1
+ done
+done
echo "done......."