From 557417d67aa0eab4fa169e41c1ec1ee53bb3086b Mon Sep 17 00:00:00 2001 From: 刘煜 Date: Fri, 14 Jun 2024 02:22:54 +0000 Subject: burst功能完成 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bbq/include/bbq.h | 48 +-- bbq/src/bbq.c | 550 +++++++++++++++++++++++++---------- bbq/tests/common/test_queue.c | 122 ++++---- bbq/tests/common/test_queue.h | 6 +- bbq/tests/unittest/ut_data.cc | 16 +- bbq/tests/unittest/ut_example.cc | 138 +++------ bbq/tests/unittest/ut_head_cursor.cc | 55 ++-- bbq/tests/unittest/ut_mix.cc | 2 +- perf/benchmark/bcm_benchmark.c | 2 +- perf/benchmark/bcm_queue.c | 14 +- perf/benchmark/benchmark.sh | 63 ++-- 11 files changed, 601 insertions(+), 415 deletions(-) diff --git a/bbq/include/bbq.h b/bbq/include/bbq.h index d08b316..7877cce 100644 --- a/bbq/include/bbq.h +++ b/bbq/include/bbq.h @@ -1,6 +1,6 @@ /* * @Author: liuyu@geedgenetworks.com - * @LastEditTime: 2024-06-07 16:30:54 + * @LastEditTime: 2024-06-14 09:27:35 * @Describe: bbq(Block-based Bounded Queue)头文件 * 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf */ @@ -28,13 +28,15 @@ using bbq_head = std::atomic; using aotmic_uint64 = std::atomic; #endif +#define __BBQ_CACHE_ALIGNED __attribute__((__aligned__(64))) + typedef struct { bbq_cursor committed; // 已提交(version|offset) bbq_cursor allocated; // 已分配(version|offset) bbq_cursor reserved; // 已预留(version|offset) bbq_cursor consumed; // 已消费(version|offset), 在drop-old模式下没用 - char *entries; // BS大小的动态数组 -} bbq_block_s; + char *entries; +} __BBQ_CACHE_ALIGNED bbq_block_s; typedef enum { BBQ_SUCCESS = 0, @@ -46,15 +48,16 @@ typedef enum { } bbq_queue_state_e; typedef struct { - bbq_block_s *block; // 指向所在的block - uint64_t bbq_off; // entry在当前block的偏移(offset) - uint64_t vsn; // allocated游标的版本(version) + uint64_t off; // entry在当前block的偏移(offset) + uint64_t vsn; // allocated游标的版本(vsn) + uint32_t actual_burst; // 实际出入队个数 + bbq_block_s *block; // 指向所在的block } bbq_entry_desc_s; typedef struct { bbq_queue_state_e state; // 队列状态 union { - uint64_t vsn; // state==BLOCK_DONE时生效 + uint64_t vsn; // reserve_entry state==BLOCK_DONE时生效 bbq_entry_desc_s e; // state为ALLOCATED、RESERVED生效 }; } bbq_queue_state_s; @@ -73,19 +76,21 @@ typedef struct { #define BBQ_COPY_POINTER(flags) (!(flags & BBQ_F_COPY_VALUE)) typedef struct { - int socket_id; // 在哪个socket_id上使用libnuma分配内存,-1表示无效,使用malloc分配 - size_t bs; // 每个block里entries成员的大小 - size_t bn; // blocks个数 + size_t bs; // 每个block里entries成员的大小 + size_t bn; // blocks个数 size_t obj_size; size_t entry_size; - unsigned int flags; - unsigned int idx_bits; - unsigned int off_bits; + + int32_t socket_id; // 在哪个socket_id上使用libnuma分配内存,-1表示无效,使用malloc分配 + uint32_t flags; + uint32_t idx_bits; + uint32_t off_bits; + uint64_t idx_mask; uint64_t off_mask; + bbq_head phead; // 生产者头,指向块的索引(version|idx) + bbq_head chead; // 消费者头,指向块的索引(version|idx) - bbq_head phead; // 生产者头,指向块的索引(version|idx) - bbq_head chead; // 消费者头,指向块的索引(version|idx) bbq_block_s *blocks; // bn大小的动态数组 } bbq_queue_s; @@ -161,26 +166,21 @@ extern int bbq_dequeue(bbq_queue_s *q, void *deq_data); * @return * 实际出队个数 */ -extern uint32_t bbq_dequeue_burst(bbq_queue_s *q, void *obj_table, uint32_t n); - -// flags 第一位控制传入的是一维数组还是二维数组 -#define BBQ_F_ARRAY_1D 0x0001 -#define BBQ_F_ARRAY_2D BBQ_F_DEFAULT -#define BBQ_CHK_ARRAY_1D(flags) (flags & BBQ_F_ARRAY_1D) -#define BBQ_CHK_ARRAY_2D(flags) (!(flags & BBQ_F_ARRAY_1D)) +extern uint32_t bbq_dequeue_burst_one_dimensional(bbq_queue_s *q, void *obj_table, uint32_t n); +extern uint32_t bbq_dequeue_burst_two_dimensional(bbq_queue_s *q, void **obj_table, uint32_t n); /** * 尝试一次入队多个数据,直到达到最大数量,或是入队失败 * * @param q * 队列指针 * @param obj_table - * @param flags * @param n * obj_table的成员个数 * @return * 实际入队个数 */ -uint32_t bbq_enqueue_burst(bbq_queue_s *q, void *obj_table, size_t n, unsigned int flags); +extern uint32_t bbq_enqueue_burst_one_dimensional(bbq_queue_s *q, void *obj_table, uint32_t n); +extern uint32_t bbq_enqueue_burst_two_dimensional(bbq_queue_s *q, void **obj_table, uint32_t n); // -----------------------------用于内存测试------------------------------- // 当BBQ_MEMORY宏定义开关打开,将对内存分配释放进行统计,方便排查内存泄漏 diff --git a/bbq/src/bbq.c b/bbq/src/bbq.c index bfdc30f..cea7a54 100644 --- a/bbq/src/bbq.c +++ b/bbq/src/bbq.c @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-06 12:02:08 + * @LastEditTime: 2024-06-13 23:11:20 * @Email: liuyu@geedgenetworks.com * @Describe: bbq(Block-based Bounded Queue)实现 * 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf @@ -11,26 +11,36 @@ #include #include -extern bbq_queue_state_s allocate_entry(bbq_queue_s *q, bbq_block_s *block); -extern void commit_entry(bbq_queue_s *q, bbq_entry_desc_s *e, void *data); -extern bbq_queue_state_s advance_phead(bbq_queue_s *q, uint64_t ph); -extern bbq_queue_state_s reserve_entry(bbq_queue_s *q, bbq_block_s *block); -extern bool consume_entry(bbq_queue_s *q, bbq_entry_desc_s *e, void *deq_data); -extern bool advance_chead(bbq_queue_s *q, uint64_t ch, uint64_t ver); -extern inline uint64_t bbq_idx(bbq_queue_s *q, uint64_t x); -extern inline uint64_t bbq_off(bbq_queue_s *q, uint64_t x); -extern inline uint64_t bbq_head_vsn(bbq_queue_s *q, uint64_t x); -extern inline uint64_t bbq_cur_vsn(bbq_queue_s *q, uint64_t x); -extern inline uint64_t set_cur_vsn(bbq_queue_s *q, uint64_t ver); -extern void *bbq_memset(void *data, size_t size); -extern void *bbq_malloc(bbq_module_e module, int socket_id, size_t size); -extern void bbq_free(bbq_module_e module, int socket_id, void *ptr, size_t size); +struct bbq_status { + int32_t status; // 返回状态 + uint32_t actual_burst; // 实际出/入队个数 +}; + +#define BBQ_MEM_MAGIC 0xFF + +extern bbq_queue_state_s allocate_entry(bbq_queue_s* q, bbq_block_s* block, uint32_t n); +extern bbq_queue_state_e advance_phead(bbq_queue_s* q, uint64_t ph); +extern bbq_queue_state_s reserve_entry(bbq_queue_s* q, bbq_block_s* block, uint32_t n); +extern bool consume_entry(bbq_queue_s* q, bbq_entry_desc_s* e, void* deq_data, uint32_t flag); +extern bool advance_chead(bbq_queue_s* q, uint64_t ch, uint64_t ver); +extern inline uint64_t bbq_idx(bbq_queue_s* q, uint64_t x); +extern inline uint64_t bbq_off(bbq_queue_s* q, uint64_t x); +extern inline uint64_t bbq_head_vsn(bbq_queue_s* q, uint64_t x); +extern inline uint64_t bbq_cur_vsn(bbq_queue_s* q, uint64_t x); +extern inline uint64_t set_cur_vsn(bbq_queue_s* q, uint64_t ver); +extern void* bbq_memset(void* data, size_t size); +extern void* bbq_malloc(bbq_module_e module, int socket_id, size_t size); +extern void bbq_free(bbq_module_e module, int socket_id, void* ptr, size_t size); + +#ifdef BBQ_DEBUG +extern void bbq_struct_print(bbq_queue_s* q); +#endif /* 原子的比较两个值大小,并设置较大的值,成功则返回设置前的旧值 */ -uint64_t fetch_max(aotmic_uint64 *atom, uint64_t upd) { +uint64_t fetch_max(aotmic_uint64* atom, uint64_t upd) { uint64_t old_value; do { - old_value = atomic_load(atom); // 读取当前值 + old_value = atomic_load(atom); // 读取当前值 } while (old_value < upd && !atomic_compare_exchange_weak(atom, &old_value, upd)); return old_value; @@ -49,16 +59,25 @@ bool bbq_check_power_of_two(uint32_t n) { * 计算公式:log2(blocks) = max(1, ⌊log2(count)/4⌋) 注:⌊ ⌋代表向下取整。*/ uint32_t bbq_blocks_calc(uint32_t entries) { double log_entries = log2((double)entries); - uint32_t over4 = (uint32_t)(log_entries / 4); // 向下取整 + uint32_t over4 = (uint32_t)(log_entries / 4); // 向下取整 uint32_t max_value = (over4 > 1) ? over4 : 1; uint32_t n = pow(2, max_value); return n; } /* 块初始化 */ -int block_init(bbq_queue_s *q, bbq_block_s *block, bool cursor_init) { +int block_init(bbq_queue_s* q, bbq_block_s* block, bool cursor_init) { +#ifdef BBQ_MEMORY + // 末尾多分配一个entry(永远不应该被修改),以此检查是否存在写越界的问题 + block->entries = bbq_malloc(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id, + (q->bs + 1) * q->entry_size); + char* last_entry = block->entries + q->entry_size * q->bs; + memset(last_entry, BBQ_MEM_MAGIC, q->entry_size); +#else block->entries = bbq_malloc(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id, - sizeof(*block->entries) * q->bs * q->entry_size); + q->bs * q->entry_size); +#endif + if (block->entries == NULL) { BBQ_ERR_LOG("bbq_malloc error"); return BBQ_ALLOC_ERR; @@ -85,10 +104,15 @@ int block_init(bbq_queue_s *q, bbq_block_s *block, bool cursor_init) { } /* 块清理函数,与block_init成对*/ -void block_cleanup(bbq_queue_s *q, bbq_block_s *block) { +void block_cleanup(bbq_queue_s* q, bbq_block_s* block) { if (block->entries) { +#ifdef BBQ_MEMORY + bbq_free(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id, + block->entries, sizeof(*block->entries) * (q->bs + 1) * q->entry_size); +#else bbq_free(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id, block->entries, sizeof(*block->entries) * q->bs * q->entry_size); +#endif block->entries = NULL; } } @@ -113,7 +137,7 @@ unsigned ceil_log2(uint64_t x) { } /* 创建消息队列,bn和bs必须是2的N次幂,socket_id用于多numa分配内存,free_func先设置NULL */ -bbq_queue_s *bbq_ring_create_bnbs_with_socket(uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, unsigned int flags) { +bbq_queue_s* bbq_ring_create_bnbs_with_socket(uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, unsigned int flags) { int ret = 0; bool numa_enable = true; @@ -137,7 +161,7 @@ bbq_queue_s *bbq_ring_create_bnbs_with_socket(uint32_t bn, uint32_t bs, size_t o socket_id = BBQ_INVALID_SOCKET; } - bbq_queue_s *q = bbq_malloc(BBQ_MODULE_QUEUE, socket_id, sizeof(*q)); + bbq_queue_s* q = bbq_malloc(BBQ_MODULE_QUEUE, socket_id, sizeof(*q)); if (q == NULL) { BBQ_ERR_LOG("malloc for bbq queue error"); return NULL; @@ -148,7 +172,7 @@ bbq_queue_s *bbq_ring_create_bnbs_with_socket(uint32_t bn, uint32_t bs, size_t o q->bs = bs; q->obj_size = obj_size; if (BBQ_COPY_POINTER(flags)) { - q->entry_size = sizeof(uintptr_t); + q->entry_size = sizeof(void*); } else { q->entry_size = obj_size; } @@ -188,12 +212,12 @@ error: } /* 创建消息队列,bn和bs必须是2的N次幂,free_func先设置NULL */ -bbq_queue_s *bbq_ring_create_bnbs(uint32_t bn, uint32_t bs, size_t obj_size, unsigned int flags) { +bbq_queue_s* bbq_ring_create_bnbs(uint32_t bn, uint32_t bs, size_t obj_size, unsigned int flags) { return bbq_ring_create_bnbs_with_socket(bn, bs, obj_size, BBQ_INVALID_SOCKET, flags); } /* 创建消息队列,count必须大于1,且是2的N次幂,bn和bs将根据count值自动计算,socket_id用于多numa分配内存,free_func先设置NULL */ -bbq_queue_s *bbq_ring_create_with_socket(uint32_t count, size_t obj_size, int socket_id, unsigned int flags) { +bbq_queue_s* bbq_ring_create_with_socket(uint32_t count, size_t obj_size, int socket_id, unsigned int flags) { if (bbq_check_power_of_two(count) == false || count == 1) { BBQ_ERR_LOG("bbq entries number must be power of two and greater than 1, now is :%lu", count); return NULL; @@ -205,13 +229,13 @@ bbq_queue_s *bbq_ring_create_with_socket(uint32_t count, size_t obj_size, int so } /* 创建消息队列,count必须大于1,且是2的N次幂,bn和bs将根据count值自动计算,free_func先设置NULL */ -bbq_queue_s *bbq_ring_create(uint32_t count, size_t obj_size, unsigned int flags) { +bbq_queue_s* bbq_ring_create(uint32_t count, size_t obj_size, unsigned int flags) { // 传入无效socket_id,将使用malloc分配内存 return bbq_ring_create_with_socket(count, obj_size, BBQ_INVALID_SOCKET, flags); } /* 释放消息队列,与bbq_ring_create系列接口成对*/ -void bbq_ring_free(bbq_queue_s *q) { +void bbq_ring_free(bbq_queue_s* q) { if (q == NULL) { return; } @@ -224,140 +248,278 @@ void bbq_ring_free(bbq_queue_s *q) { bbq_free(BBQ_MODULE_QUEUE, q->socket_id, q, sizeof(*q)); } +// flags 第一位控制传入的是一维数组还是二维数组 +#define BBQ_F_SINGLE 0x0 +#define BBQ_F_ARRAY_1D 0x1 +#define BBQ_F_ARRAY_2D 0x2 +void commit_entry(bbq_queue_s* q, bbq_entry_desc_s* e, void* data, uint32_t flag) { + size_t idx = e->off * q->entry_size; + + if (BBQ_COPY_POINTER(q->flags)) { + // 指针入队列 + switch (flag) { + case BBQ_F_ARRAY_1D: + char* tmp = (char*)data; + char* entry = &(e->block->entries[idx]); + for (size_t i = 0; i < e->actual_burst; i++) { + memcpy(entry, &tmp, q->entry_size); + entry += q->entry_size; + tmp += q->obj_size; + } + break; + case BBQ_F_ARRAY_2D: + case BBQ_F_SINGLE: + // 二维数组名等于首成员的地址,这里data其实是void **data, &data等同于 &(data[0]) + memcpy(&(e->block->entries[idx]), data, q->entry_size * e->actual_burst); + + void** tmp3 = (void**)(e->block->entries); + uint16_t* tmp2 = (uint16_t*)(tmp3[0]); + break; + default: + break; + } + } else { + // 数据入队列 + switch (flag) { + case BBQ_F_ARRAY_1D: + case BBQ_F_SINGLE: + memcpy(&(e->block->entries[idx]), data, q->entry_size * e->actual_burst); + break; + case BBQ_F_ARRAY_2D: + void** tmp = (void**)data; + char* entry = &(e->block->entries[idx]); + for (size_t i = 0; i < e->actual_burst; i++) { + memcpy(entry, *tmp, q->entry_size); + entry += q->entry_size; + tmp++; + } + break; + default: + break; + } + } + atomic_fetch_add(&e->block->committed, e->actual_burst); +} + /* 消息队列入队 */ -int bbq_enqueue(bbq_queue_s *q, void *data) { +static struct bbq_status __bbq_enqueue(bbq_queue_s* q, void* data, uint32_t n, uint32_t flag) { + struct bbq_status ret = {.status = 0, .actual_burst = 0}; + if (q == NULL || data == NULL) { - return BBQ_NULL_PTR; + ret.status = BBQ_NULL_PTR; + return ret; } while (true) { // 获取当前phead,转为索引后获取到当前的blk uint64_t ph = atomic_load(&q->phead); - bbq_block_s *blk = &(q->blocks[bbq_idx(q, ph)]); - - bbq_queue_state_s ps; - bbq_queue_state_s state = allocate_entry(q, blk); + bbq_block_s* blk = &(q->blocks[bbq_idx(q, ph)]); + bbq_queue_state_s state = allocate_entry(q, blk, n); switch (state.state) { - case BBQ_ALLOCATED: - commit_entry(q, &state.e, data); - return BBQ_OK; - case BBQ_BLOCK_DONE: - ps = advance_phead(q, ph); - switch (ps.state) { - case BBQ_NO_ENTRY: - return BBQ_QUEUE_FULL; - case BBQ_NOT_AVAILABLE: - return BBQ_QUEUE_BUSY; - case BBQ_SUCCESS: - continue; - } - break; - default: - BBQ_ERR_LOG("Invalid QueueState in bbq_enqueue: %d", state.state); - return BBQ_ERROR; + case BBQ_ALLOCATED: + commit_entry(q, &state.e, data, flag); + ret.actual_burst = state.e.actual_burst; + ret.status = BBQ_OK; + return ret; + case BBQ_BLOCK_DONE: + bbq_queue_state_e pstate = advance_phead(q, ph); + switch (pstate) { + case BBQ_NO_ENTRY: + ret.status = BBQ_QUEUE_FULL; + return ret; + case BBQ_NOT_AVAILABLE: + ret.status = BBQ_QUEUE_BUSY; + return ret; + case BBQ_SUCCESS: + continue; + } + break; + default: + BBQ_DBG_LOG("Invalid QueueState in bbq_enqueue: %d", state.state); + ret.status = BBQ_ERROR; + return ret; } } } +int bbq_enqueue(bbq_queue_s* q, void* data) { + struct bbq_status ret = __bbq_enqueue(q, data, 1, BBQ_F_SINGLE); + return ret.status; +} + /* 消息队列出队 */ -int bbq_dequeue(bbq_queue_s *q, void *deq_data) { +static struct bbq_status __bbq_dequeue(bbq_queue_s* q, void* deq_data, uint32_t n, uint32_t flag) { + struct bbq_status ret = {.status = 0, .actual_burst = 0}; if (q == NULL || deq_data == NULL) { - return BBQ_NULL_PTR; + ret.status = BBQ_NULL_PTR; + return ret; } while (true) { uint64_t ch = atomic_load(&q->chead); - bbq_block_s *blk = &(q->blocks[bbq_idx(q, ch)]); + bbq_block_s* blk = &(q->blocks[bbq_idx(q, ch)]); bbq_queue_state_s state; - state = reserve_entry(q, blk); + state = reserve_entry(q, blk, n); switch (state.state) { - case BBQ_RESERVED: - if (consume_entry(q, &state.e, deq_data)) { - return BBQ_OK; - } else { - continue; - } - case BBQ_NO_ENTRY: - return BBQ_QUEUE_EMPTY; - case BBQ_NOT_AVAILABLE: - return BBQ_QUEUE_BUSY; - case BBQ_BLOCK_DONE: - if (advance_chead(q, ch, state.vsn)) { - continue; - } else { - return BBQ_QUEUE_EMPTY; - } - default: - BBQ_ERR_LOG("Invalid QueueState in dequeue state: %d", state.state); - return BBQ_ERROR; + case BBQ_RESERVED: + if (consume_entry(q, &state.e, deq_data, flag)) { + ret.status = BBQ_OK; + ret.actual_burst = state.e.actual_burst; + return ret; + } else { + continue; + } + case BBQ_NO_ENTRY: + ret.status = BBQ_QUEUE_EMPTY; + return ret; + case BBQ_NOT_AVAILABLE: + ret.status = BBQ_QUEUE_BUSY; + return ret; + case BBQ_BLOCK_DONE: + if (advance_chead(q, ch, state.vsn)) { + continue; + } else { + ret.status = BBQ_QUEUE_EMPTY; + return ret; + } + default: + BBQ_DBG_LOG("Invalid QueueState in dequeue state: %d", state.state); + ret.status = BBQ_ERROR; + return ret; } } } -/* 将多个对象从一个环形队列ring取出,直到达到最大数量,或是出队失败 */ -uint32_t bbq_dequeue_burst(bbq_queue_s *q, void *obj_table, uint32_t n) { +int bbq_dequeue(bbq_queue_s* q, void* deq_data) { + struct bbq_status ret = __bbq_dequeue(q, deq_data, 1, BBQ_F_SINGLE); + return ret.status; +} + +uint32_t bbq_max_burst(bbq_queue_s* q, uint32_t n) { + uint32_t burst = n; + if (burst > q->bs) { + burst = q->bs; + } + + return burst; +} + +uint32_t bbq_dequeue_burst_one_dimensional(bbq_queue_s* q, void* obj_table, uint32_t n) { if (q == NULL || obj_table == NULL) { return BBQ_NULL_PTR; } - uint32_t cnt = 0; - int ret = 0; - void *obj = NULL; + uint32_t burst = 0; + uint32_t ready = 0; + void* obj = obj_table; + struct bbq_status ret = {0}; - for (cnt = 0; cnt < n; cnt++) { - if (BBQ_COPY_VALUE(q->flags)) { - obj = (obj == NULL ? obj_table : obj + q->obj_size); - } else { - obj = &((void **)obj_table)[cnt]; + while (ready < n) { + burst = bbq_max_burst(q, n - ready); + ret = __bbq_dequeue(q, obj, burst, BBQ_F_ARRAY_1D); + if (ret.status != BBQ_OK) { + break; } + obj += q->obj_size * ret.actual_burst; + ready += ret.actual_burst; - ret = bbq_dequeue(q, obj); - if (ret != BBQ_OK) { - return cnt; + // bbq_struct_print(q); + } + + return ready; +} + +uint32_t bbq_dequeue_burst_two_dimensional(bbq_queue_s* q, void** obj_table, uint32_t n) { + if (q == NULL || obj_table == NULL) { + return BBQ_NULL_PTR; + } + + uint32_t burst = 0; + uint32_t ready = 0; + void** obj_table_tmp = obj_table; + struct bbq_status ret = {0}; + + while (ready < n) { + burst = bbq_max_burst(q, n - ready); + ret = __bbq_dequeue(q, obj_table_tmp, burst, BBQ_F_ARRAY_2D); + if (ret.status != BBQ_OK) { + break; } + obj_table_tmp += ret.actual_burst; + ready += ret.actual_burst; + + // bbq_struct_print(q); } - return cnt; + return ready; } /* 尝试一次入队多个数据,直到达到最大数量,或是入队失败 */ -uint32_t bbq_enqueue_burst(bbq_queue_s *q, void *obj_table, size_t n, unsigned int flags) { +uint32_t bbq_enqueue_burst_one_dimensional(bbq_queue_s* q, void* obj_table, uint32_t n) { if (q == NULL || obj_table == NULL) { return BBQ_NULL_PTR; } - uint32_t cnt = 0; - void *obj = NULL; + uint32_t burst = 0; + uint32_t ready = 0; + void* obj = obj_table; + struct bbq_status ret = {0}; - for (cnt = 0; cnt < n; cnt++) { - if (BBQ_CHK_ARRAY_1D(flags)) { - obj = (obj == NULL ? obj_table : obj + q->obj_size); - } else { - obj = ((void **)obj_table)[cnt]; + while (ready < n) { + burst = bbq_max_burst(q, n - ready); + ret = __bbq_enqueue(q, obj, burst, BBQ_F_ARRAY_1D); + if (ret.status != BBQ_OK) { + break; } + obj += q->obj_size * ret.actual_burst; + ready += ret.actual_burst; - if (BBQ_OK != bbq_enqueue(q, obj)) { - return cnt; + // bbq_struct_print(q); + } + + return ready; +} + +/* 尝试一次入队多个数据,直到达到最大数量,或是入队失败 */ +uint32_t bbq_enqueue_burst_two_dimensional(bbq_queue_s* q, void** obj_table, uint32_t n) { + if (q == NULL || obj_table == NULL) { + return BBQ_NULL_PTR; + } + + uint32_t burst = 0; + uint32_t ready = 0; + void** obj_table_tmp = obj_table; + struct bbq_status ret = {0}; + + while (ready < n) { + burst = bbq_max_burst(q, n - ready); + ret = __bbq_enqueue(q, obj_table_tmp, burst, BBQ_F_ARRAY_2D); + if (ret.status != BBQ_OK) { + break; } + obj_table_tmp += ret.actual_burst; + ready += ret.actual_burst; + + // bbq_struct_print(q); } - return cnt; + return ready; } -bbq_queue_state_s allocate_entry(bbq_queue_s *q, bbq_block_s *block) { +bbq_queue_state_s allocate_entry(bbq_queue_s* q, bbq_block_s* block, uint32_t n) { bbq_queue_state_s state = {0}; if (bbq_off(q, atomic_load(&block->allocated)) >= q->bs) { state.state = BBQ_BLOCK_DONE; return state; } - uint64_t old = atomic_fetch_add(&block->allocated, 1); + uint64_t old = atomic_fetch_add(&block->allocated, n); // burst uint64_t committed_vsn = bbq_cur_vsn(q, atomic_load(&block->committed)); - // committed_vsn,在当前块被初始化后值是不变的,判断vsn是考虑到极限情况下给off预留的空间不足导致,allocated的off溢出(vsn+1) + // committed_vsn在当前块被初始化后值是不变的,通过比较vsn值,来判断allocated的off是否溢出了,导致vsn+1 uint64_t cur_vsn = bbq_cur_vsn(q, old); uint64_t cur_off = bbq_off(q, old); if ((cur_vsn != committed_vsn) || (cur_off >= q->bs)) { @@ -365,50 +527,43 @@ bbq_queue_state_s allocate_entry(bbq_queue_s *q, bbq_block_s *block) { return state; } - state.state = BBQ_ALLOCATED; + if (cur_off + n <= q->bs) { + // 可以全部入队 + state.e.actual_burst = n; + } else { + // 部分入队 + state.e.actual_burst = q->bs - cur_off; + } state.e.block = block; state.e.vsn = cur_vsn; - state.e.bbq_off = cur_off; + state.e.off = cur_off; + state.state = BBQ_ALLOCATED; return state; } -void commit_entry(bbq_queue_s *q, bbq_entry_desc_s *e, void *data) { - size_t idx = e->bbq_off * q->entry_size; - if (BBQ_COPY_POINTER(q->flags)) { - uintptr_t *uptr = (uintptr_t *)(&(e->block->entries[idx])); - *uptr = (uintptr_t)(data); - } else { - memcpy(&(e->block->entries[idx]), data, q->entry_size); - } - atomic_fetch_add(&e->block->committed, 1); -} - -bbq_queue_state_s advance_phead(bbq_queue_s *q, uint64_t ph) { +bbq_queue_state_e advance_phead(bbq_queue_s* q, uint64_t ph) { // 获取下一个block - bbq_block_s *n_blk; + bbq_block_s* n_blk; n_blk = &(q->blocks[(bbq_idx(q, ph) + 1) & q->idx_mask]); uint64_t cur, reserved; - bbq_queue_state_s state = {0}; if (BBQ_POLIC_RETRY_NEW(q->flags)) { cur = atomic_load(&n_blk->consumed); - if (bbq_cur_vsn(q, cur) < bbq_head_vsn(q, ph) || // 生产者赶上了消费者 + if (bbq_cur_vsn(q, cur) < bbq_head_vsn(q, ph) || // 生产者赶上了消费者 (bbq_cur_vsn(q, cur) == bbq_head_vsn(q, ph) && bbq_off(q, cur) != q->bs)) { reserved = atomic_load(&n_blk->reserved); if (bbq_off(q, reserved) == bbq_off(q, cur)) { - state.state = BBQ_NO_ENTRY; + return BBQ_NO_ENTRY; } else { - state.state = BBQ_NOT_AVAILABLE; + return BBQ_NOT_AVAILABLE; } - return state; } } else { cur = atomic_load(&n_blk->committed); // 生产者避免前进到上一轮中尚未完全提交的区块 if (bbq_cur_vsn(q, cur) == bbq_head_vsn(q, ph) && bbq_off(q, cur) != q->bs) { - state.state = BBQ_NOT_AVAILABLE; - return state; + return BBQ_NOT_AVAILABLE; } } @@ -418,44 +573,64 @@ bbq_queue_state_s advance_phead(bbq_queue_s *q, uint64_t ph) { // 索引+1,当超过索引范围,也就是循环下一轮块时,version+1 fetch_max(&q->phead, ph + 1); - state.state = BBQ_SUCCESS; - return state; + return BBQ_SUCCESS; +} + +/* 更新成功 reserve成功的个数 */ +uint32_t reserve_update(bbq_cursor* aotmic, uint64_t reserved, uint32_t n) { + if (n == 1) { + // fetch_max返回的是旧值,如果旧值等于局部变量reserved,则代表数据由当前线程更新 + if (fetch_max(aotmic, reserved + 1) == reserved) { + return 1; + } + + return 0; + } else { + bool ret = atomic_compare_exchange_weak(aotmic, &reserved, reserved + n); + return ret == true ? n : 0; + } } -bbq_queue_state_s reserve_entry(bbq_queue_s *q, bbq_block_s *block) { +bbq_queue_state_s reserve_entry(bbq_queue_s* q, bbq_block_s* block, uint32_t n) { while (true) { bbq_queue_state_s state; uint64_t reserved = atomic_load(&block->reserved); - if (bbq_off(q, reserved) < q->bs) { + uint64_t reserved_off = bbq_off(q, reserved); + uint64_t reserved_svn = bbq_cur_vsn(q, reserved); + + if (reserved_off < q->bs) { uint64_t consumed = atomic_load(&block->consumed); - if (BBQ_POLIC_RETRY_NEW(q->flags) && bbq_cur_vsn(q, reserved) != bbq_cur_vsn(q, consumed)) { + if (BBQ_POLIC_RETRY_NEW(q->flags) && reserved_svn != bbq_cur_vsn(q, consumed)) { // consumed溢出了,这种情况只发生在BBQ_RETRY_NEW,因为BBQ_DROP_OLD模式,consumed没有用到 state.state = BBQ_BLOCK_DONE; - state.vsn = bbq_cur_vsn(q, reserved); + state.vsn = reserved_svn; return state; } uint64_t committed = atomic_load(&block->committed); - if (bbq_off(q, committed) == bbq_off(q, reserved)) { // TODO:多entry关注 + uint64_t committed_off = bbq_off(q, committed); + if (committed_off == reserved_off) { // TODO:多entry关注 state.state = BBQ_NO_ENTRY; return state; } // 当前块的数据没有被全部commited,需要通过判断allocated和committed来判断是否存在正在入队进行中的数据 - if (bbq_off(q, committed) != q->bs) { + if (committed_off != q->bs) { uint64_t allocated = atomic_load(&block->allocated); - if (bbq_off(q, allocated) != bbq_off(q, committed)) { + if (bbq_off(q, allocated) != committed_off) { state.state = BBQ_NOT_AVAILABLE; return state; } } - if (fetch_max(&block->reserved, reserved + 1) == reserved) { // TODO:多entry时关注 - // fetch_max返回的是旧值,因此预期返回的旧值要等于局部变量reserved + int32_t tmp = committed_off - reserved_off; + uint32_t reserved_cnt = reserve_update(&block->reserved, reserved, tmp < n ? tmp : n); + if (reserved_cnt > 0) { // TODO:多entry时关注 state.state = BBQ_RESERVED; + state.e.actual_burst = reserved_cnt; state.e.block = block; - state.e.bbq_off = bbq_off(q, reserved); - state.e.vsn = bbq_cur_vsn(q, reserved); + state.e.off = reserved_off; + state.e.vsn = reserved_svn; return state; } else { @@ -465,23 +640,47 @@ bbq_queue_state_s reserve_entry(bbq_queue_s *q, bbq_block_s *block) { } state.state = BBQ_BLOCK_DONE; - state.vsn = bbq_cur_vsn(q, reserved); + state.vsn = reserved_svn; return state; } } -bool consume_entry(bbq_queue_s *q, bbq_entry_desc_s *e, void *deq_data) { - size_t idx = e->bbq_off * q->entry_size; +bool consume_entry(bbq_queue_s* q, bbq_entry_desc_s* e, void* deq_data, uint32_t flag) { + size_t idx = e->off * q->entry_size; + if (BBQ_COPY_POINTER(q->flags)) { - uintptr_t *uptr = (uintptr_t *)(&(e->block->entries[idx])); - *((void **)deq_data) = (void *)(*uptr); + switch (flag) { + case BBQ_F_ARRAY_2D: + case BBQ_F_SINGLE: + memcpy(deq_data, &(e->block->entries[idx]), q->entry_size * e->actual_burst); + break; + case BBQ_F_ARRAY_1D: + default: + break; + } } else { - memcpy(deq_data, &(e->block->entries[idx]), q->entry_size); + switch (flag) { + case BBQ_F_ARRAY_1D: + case BBQ_F_SINGLE: + memcpy(deq_data, &(e->block->entries[idx]), q->entry_size * e->actual_burst); + break; + case BBQ_F_ARRAY_2D: + void** tmp = (void**)deq_data; + char* entry = &(e->block->entries[idx]); + for (size_t i = 0; i < e->actual_burst; i++) { + memcpy(*tmp, entry, q->entry_size); + entry += q->entry_size; + tmp++; + } + break; + default: + break; + } } uint64_t allocated; if (BBQ_POLIC_RETRY_NEW(q->flags)) { - atomic_fetch_add(&e->block->consumed, 1); + atomic_fetch_add(&e->block->consumed, e->actual_burst); } else { allocated = atomic_load(&e->block->allocated); // 预留的entry所在的块,已经被新生产的数据赶上了 @@ -493,8 +692,8 @@ bool consume_entry(bbq_queue_s *q, bbq_entry_desc_s *e, void *deq_data) { return true; } -bool advance_chead(bbq_queue_s *q, uint64_t ch, uint64_t ver) { - bbq_block_s *n_blk = &(q->blocks[(bbq_idx(q, ch) + 1) & q->idx_mask]); +bool advance_chead(bbq_queue_s* q, uint64_t ch, uint64_t ver) { + bbq_block_s* n_blk = &(q->blocks[(bbq_idx(q, ch) + 1) & q->idx_mask]); uint64_t committed = atomic_load(&n_blk->committed); if (BBQ_POLIC_RETRY_NEW(q->flags)) { @@ -516,27 +715,27 @@ bool advance_chead(bbq_queue_s *q, uint64_t ch, uint64_t ver) { return true; } -inline uint64_t bbq_idx(bbq_queue_s *q, uint64_t x) { +inline uint64_t bbq_idx(bbq_queue_s* q, uint64_t x) { return x & q->idx_mask; } -inline uint64_t bbq_off(bbq_queue_s *q, uint64_t x) { +inline uint64_t bbq_off(bbq_queue_s* q, uint64_t x) { return x & q->off_mask; } -inline uint64_t bbq_head_vsn(bbq_queue_s *q, uint64_t x) { +inline uint64_t bbq_head_vsn(bbq_queue_s* q, uint64_t x) { return x >> q->idx_bits; } -inline uint64_t bbq_cur_vsn(bbq_queue_s *q, uint64_t x) { +inline uint64_t bbq_cur_vsn(bbq_queue_s* q, uint64_t x) { return x >> q->off_bits; } -inline uint64_t set_cur_vsn(bbq_queue_s *q, uint64_t ver) { +inline uint64_t set_cur_vsn(bbq_queue_s* q, uint64_t ver) { return ver << q->off_bits; } -void *bbq_memset(void *data, size_t size) { +void* bbq_memset(void* data, size_t size) { if (data != NULL && size > 0) { memset(data, 0, size); } @@ -552,8 +751,8 @@ typedef struct { bbq_memory_s bbq_memory_g[BBQ_MODULE_MAX] = {0}; #endif -void *bbq_malloc(bbq_module_e module, int socket_id, size_t size) { - void *ptr = NULL; +void* bbq_malloc(bbq_module_e module, int socket_id, size_t size) { + void* ptr = NULL; if (socket_id >= 0) { ptr = numa_alloc_onnode(size, 0); } else { @@ -569,7 +768,7 @@ void *bbq_malloc(bbq_module_e module, int socket_id, size_t size) { return ptr; } -void bbq_free(bbq_module_e module, int socket_id, void *ptr, size_t size) { +void bbq_free(bbq_module_e module, int socket_id, void* ptr, size_t size) { #ifdef BBQ_MEMORY if (ptr != NULL) { atomic_fetch_add(&bbq_memory_g[module].free_cnt, 1); @@ -598,7 +797,7 @@ bool bbq_malloc_free_equal() { uint64_t malloc_size = atomic_load(&bbq_memory_g[i].malloc_size); uint64_t free_size = atomic_load(&bbq_memory_g[i].free_size); if (malloc_size != free_size) { - BBQ_ERR_LOG("[module:%d] malloc:%lu free:%lu, bbq mmalloc-free size not equal\n", i, malloc_cnt, free_cnt); + BBQ_ERR_LOG("[module:%d] malloc:%lu byte free:%lu byte, bbq mmalloc-free size not equal\n", i, malloc_size, free_size); ret = false; } } @@ -608,6 +807,22 @@ bool bbq_malloc_free_equal() { #endif } +bool bbq_check_array_bounds(bbq_queue_s* q) { +#ifdef BBQ_MEMORY + void* value = malloc(q->entry_size); + memset(value, BBQ_MEM_MAGIC, q->entry_size); + + for (size_t i = 0; i < q->bn; i++) { + // 针对内存检查版本,申请了bs+1个entry + char* last_entry = q->blocks[i].entries + q->bs * q->entry_size; + if (memcmp(last_entry, value, q->entry_size) != 0) { + return false; + } + } +#endif + return true; +} + void bbq_memory_info() { #ifdef BBQ_MEMORY for (int i = 0; i < BBQ_MODULE_MAX; i++) { @@ -628,4 +843,25 @@ void bbq_memory_info() { BBQ_ERR_LOG("memory not all free"); } #endif -} \ No newline at end of file +} + +#ifdef BBQ_DEBUG +void bbq_block_print(bbq_queue_s* q, bbq_block_s* block) { + bbq_cursor allocated = atomic_load(&block->allocated); + bbq_cursor committed = atomic_load(&block->committed); + bbq_cursor reserved = atomic_load(&block->reserved); + bbq_cursor consumed = atomic_load(&block->consumed); + printf(" allocated:%lu\n", bbq_off(q, allocated)); + printf(" committed:%lu\n", bbq_off(q, committed)); + printf(" reserved:%lu\n", bbq_off(q, reserved)); + printf(" consumed:%lu\n\n", bbq_off(q, consumed)); +} + +void bbq_struct_print(bbq_queue_s* q) { + printf("ph idx:%lu vsn:%lu\n", bbq_idx(q, q->phead), bbq_head_vsn(q, q->phead)); + bbq_block_print(q, &(q->blocks[bbq_idx(q, q->phead)])); + + // printf("ch idx:%lu vsn:%lu", bbq_idx(q, q->chead), bbq_head_vsn(q, q->chead)); + // bbq_block_print(q, &(q->blocks[bbq_idx(q, q->chead)])); +} +#endif \ No newline at end of file diff --git a/bbq/tests/common/test_queue.c b/bbq/tests/common/test_queue.c index d8787e5..6b57df7 100644 --- a/bbq/tests/common/test_queue.c +++ b/bbq/tests/common/test_queue.c @@ -1,18 +1,23 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-11 11:46:12 + * @LastEditTime: 2024-06-13 23:25:09 * @Email: liuyu@geedgenetworks.com * @Describe: TODO */ #include "test_queue.h" -#include "bbq.h" -#include "test_mix.h" #include #include +#include "bbq.h" +#include "test_mix.h" +extern bool bbq_check_array_bounds(bbq_queue_s* q); -int test_queue_init_bbq(test_cfg *cfg, test_queue_s *q) { - bbq_queue_s *ring; +uint32_t test_bbq_enqueue_burst(void* ring, void* obj_table, uint32_t n, uint16_t thread_idx) { + return bbq_enqueue_burst_two_dimensional(ring, obj_table, n); +} + +int test_queue_init_bbq(test_cfg* cfg, test_queue_s* q) { + bbq_queue_s* ring; size_t obj_size = sizeof(test_data); if (cfg->ring.block_count == 0) { @@ -29,35 +34,34 @@ int test_queue_init_bbq(test_cfg *cfg, test_queue_s *q) { return BBQ_NULL_PTR; } - bbq_queue_s *bbq_ring = (bbq_queue_s *)q->ring; + bbq_queue_s* bbq_ring = (bbq_queue_s*)q->ring; BBQ_DBG_LOG("block number:%lu size:%lu", bbq_ring->bn, bbq_ring->bs); q->ring_free_f = (test_ring_free_f)bbq_ring_free; q->enqueue_f = (test_ring_enqueue_f)bbq_enqueue; q->dequeue_f = (test_ring_dequeue_f)bbq_dequeue; - q->dequeue_burst_f = (test_dequeue_burst_f)bbq_dequeue_burst; - q->enqueue_burst_f = (test_enqueue_burst_f)bbq_enqueue_burst; - + q->dequeue_burst_f = (test_dequeue_burst_f)bbq_dequeue_burst_two_dimensional; + q->enqueue_burst_f = (test_enqueue_burst_f)test_bbq_enqueue_burst; return 0; } -void test_queue_destory(test_queue_s *q) { +void test_queue_destory(test_queue_s* q) { if (q != NULL && q->ring_free_f != NULL) { q->ring_free_f(q->ring); } } -bool test_all_producer_exit(test_info_s *test_info) { +bool test_all_producer_exit(test_info_s* test_info) { return atomic_load(&test_info->ctl.producer_exit) == test_info->cfg.ring.producer_cnt; } -void test_wait_all_threads_ready(test_ctl *ctl) { +void test_wait_all_threads_ready(test_ctl* ctl) { pthread_barrier_wait(&ctl->all_threads_start); BBQ_DBG_LOG("thread init done!"); } -test_exit_data *test_exit_data_create(test_thread_arg_s *t_arg) { - test_exit_data *exit_data = (test_exit_data *)test_malloc(TEST_MODULE_COMMON, sizeof(test_exit_data)); +test_exit_data* test_exit_data_create(test_thread_arg_s* t_arg) { + test_exit_data* exit_data = (test_exit_data*)test_malloc(TEST_MODULE_COMMON, sizeof(test_exit_data)); if (exit_data == NULL) { BBQ_ERR_LOG("malloc failed"); exit(-1); @@ -79,14 +83,14 @@ test_exit_data *test_exit_data_create(test_thread_arg_s *t_arg) { return exit_data; } -test_exit_data *test_exit_data_destory(test_exit_data *data) { +test_exit_data* test_exit_data_destory(test_exit_data* data) { test_data_destory(data->simple_data, data->simple_data_cnt); test_free(TEST_MODULE_COMMON, data->arg); test_free(TEST_MODULE_COMMON, data); } -test_data **test_data_create(size_t cnt) { - test_data **simple_data = test_malloc(TEST_MODULE_DATA, sizeof(*simple_data) * cnt); +test_data** test_data_create(size_t cnt) { + test_data** simple_data = test_malloc(TEST_MODULE_DATA, sizeof(*simple_data) * cnt); test_time_metric enqueue_time = test_clock_time_get(); for (size_t i = 0; i < cnt; i++) { simple_data[i] = test_malloc(TEST_MODULE_DATA, sizeof(*simple_data[i])); @@ -97,58 +101,42 @@ test_data **test_data_create(size_t cnt) { return simple_data; } -void test_data_destory(test_data **data, size_t cnt) { +void test_data_destory(test_data** data, size_t cnt) { for (size_t i = 0; i < cnt; i++) { test_free(TEST_MODULE_DATA, data[i]); } test_free(TEST_MODULE_DATA, data); } -uint32_t test_exec_enqueue(test_queue_s *q, test_data **data, size_t burst_cnt, test_time_metric *op_use_diff, uint16_t thread_idx) { +uint32_t test_exec_enqueue(test_queue_s* q, test_data** data, size_t burst_cnt, test_time_metric* op_use_diff, uint16_t thread_idx) { uint32_t enqueue_cnt = 0; test_time_metric op_use_start = test_clock_time_get(); - if (burst_cnt == 1) { - if (q->enqueue_f) { - if (q->enqueue_f(q->ring, data[0]) == 0) { - enqueue_cnt = 1; - } - } - } else { - enqueue_cnt = q->enqueue_burst_f(q->ring, data, burst_cnt, BBQ_F_ARRAY_2D, thread_idx); - } + enqueue_cnt = q->enqueue_burst_f(q->ring, data, burst_cnt, thread_idx); *op_use_diff = test_clock_time_sub(test_clock_time_get(), op_use_start); return enqueue_cnt; } -uint32_t test_exec_dequeue(test_queue_s *q, test_data **data, size_t burst_cnt, test_time_metric *op_use_diff) { +uint32_t test_exec_dequeue(test_queue_s* q, test_data** data, size_t burst_cnt, test_time_metric* op_use_diff) { uint32_t dequeue_cnt = 0; test_time_metric op_use_start = test_clock_time_get(); - if (burst_cnt == 1) { - if (q->dequeue_f) { - if (q->dequeue_f(q->ring, &data[0]) == 0) { - dequeue_cnt = 1; - } - } - } else { - dequeue_cnt = q->dequeue_burst_f(q->ring, data, burst_cnt); - } + dequeue_cnt = q->dequeue_burst_f(q->ring, (void**)data, burst_cnt); *op_use_diff = test_clock_time_sub(test_clock_time_get(), op_use_start); return dequeue_cnt; } -void *test_thread_producer_start(void *arg) { +void* test_thread_producer_start(void* arg) { int ret = 0; uint32_t enqueue_cnt = 0; uint64_t ok_cnt = 0; uint64_t run_times = 0; - test_thread_arg_s *t_arg = (test_thread_arg_s *)arg; - test_info_s *test_info = t_arg->test_info; - test_cfg *cfg = &test_info->cfg; - test_queue_s *q = t_arg->q; - test_exit_data *exit_data = test_exit_data_create(t_arg); + test_thread_arg_s* t_arg = (test_thread_arg_s*)arg; + test_info_s* test_info = t_arg->test_info; + test_cfg* cfg = &test_info->cfg; + test_queue_s* q = t_arg->q; + test_exit_data* exit_data = test_exit_data_create(t_arg); char thread_name[128] = {0}; uint64_t op_ok_latency_ns = 0; @@ -176,7 +164,7 @@ void *test_thread_producer_start(void *arg) { enqueue_cnt = test_exec_enqueue(q, exit_data->simple_data, cfg->ring.burst_cnt, &op_latency, t_arg->thread_idx); } else { // 由于rmind不支持指定个数的批量出队列,为了兼容它,这里分配的空间位置entries大小。 - test_data **data = test_data_create(cfg->ring.entries_cnt); + test_data** data = test_data_create(cfg->ring.entries_cnt); if (data == NULL) { BBQ_ERR_LOG("malloc falied"); exit(-1); @@ -211,22 +199,22 @@ void *test_thread_producer_start(void *arg) { pthread_exit(exit_data); } -void *test_thread_consumer_start(void *arg) { +void* test_thread_consumer_start(void* arg) { uint32_t deq_cnt = -1; uint64_t ok_cnt = 0; uint64_t run_times = 0; - test_thread_arg_s *t_arg = (test_thread_arg_s *)arg; - test_info_s *test_info = t_arg->test_info; - test_cfg *cfg = &test_info->cfg; - test_queue_s *q = t_arg->q; - test_exit_data *exit_data = test_exit_data_create(t_arg); + test_thread_arg_s* t_arg = (test_thread_arg_s*)arg; + test_info_s* test_info = t_arg->test_info; + test_cfg* cfg = &test_info->cfg; + test_queue_s* q = t_arg->q; + test_exit_data* exit_data = test_exit_data_create(t_arg); uint64_t latency_ns = 0; test_time_metric op_latency = {0}; uint64_t op_ok_latency_ns; uint64_t op_err_latency_ns = 0; uint64_t data_error_cnt = 0; char thread_name[128] = {0}; - test_data **deq_data = test_malloc(TEST_MODULE_DATA, sizeof(*deq_data) * cfg->ring.entries_cnt); + test_data** deq_data = test_malloc(TEST_MODULE_DATA, sizeof(*deq_data) * cfg->ring.entries_cnt); snprintf(thread_name, sizeof(thread_name), "consumer:%x", exit_data->thread_id); prctl(PR_SET_NAME, thread_name); @@ -251,7 +239,7 @@ void *test_thread_consumer_start(void *arg) { deq_cnt = test_exec_dequeue(q, deq_data, cfg->ring.burst_cnt, &op_latency); if (deq_cnt > 0) { for (uint32_t i = 0; i < deq_cnt; i++) { - test_data *data = deq_data[i]; + test_data* data = deq_data[i]; if (cfg->ring.workload == TEST_WORKLOAD_SIMPLE) { if (data->data != TEST_DATA_MAGIC) { BBQ_ERR_LOG("the obtained data is not consistent with the expectation, expect:%u actual:%u", TEST_DATA_MAGIC, data->data); @@ -295,22 +283,22 @@ void *test_thread_consumer_start(void *arg) { pthread_exit(exit_data); } -void test_wait_all_threads_exit(test_info_s *test_info, uint32_t thread_cnt, pthread_t *threads, test_exit_data **exit_data) { +void test_wait_all_threads_exit(test_info_s* test_info, uint32_t thread_cnt, pthread_t* threads, test_exit_data** exit_data) { if (test_info->cfg.run.run_time > 0) { - BBQ_DBG_LOG("sleep %lus, and notify all threads to exit...", cfg->run.run_time); + BBQ_DBG_LOG("sleep %lus, and notify all threads to exit...", test_info->cfg.run.run_time); sleep(test_info->cfg.run.run_time); test_info->ctl.running = false; } for (uint32_t i = 0; i < thread_cnt; i++) { - pthread_join(threads[i], (void **)(&exit_data[i])); // 等待每个线程结束 + pthread_join(threads[i], (void**)(&exit_data[i])); // 等待每个线程结束 } } -pthread_t * -test_one_thread_create(test_info_s *test_info, test_queue_s *q, test_thread_type_e ttype, int core, uint16_t thread_id, pthread_t *thread) { +pthread_t* +test_one_thread_create(test_info_s* test_info, test_queue_s* q, test_thread_type_e ttype, int core, uint16_t thread_id, pthread_t* thread) { BBQ_DBG_LOG("thread type:%d core:%d", ttype, core); - test_thread_arg_s *arg = (test_thread_arg_s *)test_malloc(TEST_MODULE_COMMON, sizeof(test_thread_arg_s)); // 线程回收时free + test_thread_arg_s* arg = (test_thread_arg_s*)test_malloc(TEST_MODULE_COMMON, sizeof(test_thread_arg_s)); // 线程回收时free arg->test_info = test_info; arg->q = q; arg->ttype = ttype; @@ -329,14 +317,14 @@ test_one_thread_create(test_info_s *test_info, test_queue_s *q, test_thread_type core_id = (core_id + 1) < max_id ? (core_id + 1) : core_id; \ } while (0) -pthread_t *test_threads_create(test_info_s *test_info, test_queue_s *q) { +pthread_t* test_threads_create(test_info_s* test_info, test_queue_s* q) { // 创建生产者消费者线程 int ret; uint16_t thread_id = 0; int core_id = 0; - test_cfg *cfg = &test_info->cfg; + test_cfg* cfg = &test_info->cfg; size_t thread_cnt = cfg->ring.producer_cnt + cfg->ring.consumer_cnt; - pthread_t *threads = (pthread_t *)test_malloc(TEST_MODULE_COMMON, sizeof(pthread_t) * thread_cnt); // 存储所有线程ID的数组 + pthread_t* threads = (pthread_t*)test_malloc(TEST_MODULE_COMMON, sizeof(pthread_t) * thread_cnt); // 存储所有线程ID的数组 pthread_barrier_init(&test_info->ctl.all_threads_start, NULL, thread_cnt); test_info->ctl.running = true; @@ -365,8 +353,8 @@ pthread_t *test_threads_create(test_info_s *test_info, test_queue_s *q) { } } else { // MPMC 或 只有生产者 或这有消费者,核心交错分配 - uint32_t pcnt = cfg->ring.producer_cnt; // 生产者个数 - uint32_t ccnt = cfg->ring.consumer_cnt; // 消费者个数 + uint32_t pcnt = cfg->ring.producer_cnt; // 生产者个数 + uint32_t ccnt = cfg->ring.consumer_cnt; // 消费者个数 for (core_id = 0; core_id < cfg->base.cores_cnt && pcnt > 0 && ccnt > 0;) { if ((core_id & 1) == 0) { // 偶数 @@ -397,12 +385,12 @@ pthread_t *test_threads_create(test_info_s *test_info, test_queue_s *q) { return threads; } -void test_threads_destory(test_info_s *test_info, pthread_t *threads) { +void test_threads_destory(test_info_s* test_info, pthread_t* threads) { pthread_barrier_destroy(&test_info->ctl.all_threads_start); test_free(TEST_MODULE_COMMON, threads); } -void test_merge_data_detail(test_merge_data *merge, test_exit_data *exit_data) { +void test_merge_data_detail(test_merge_data* merge, test_exit_data* exit_data) { merge->run_times += exit_data->run_times; merge->ok_cnt += exit_data->ok_cnt; merge->latency_ns += exit_data->latency_ns; @@ -411,7 +399,7 @@ void test_merge_data_detail(test_merge_data *merge, test_exit_data *exit_data) { merge->data_error_cnt += exit_data->data_error_cnt; } -void test_merge_all_data(test_exit_data **exit_data, uint32_t thread_cnt, test_merge_s *merge) { +void test_merge_all_data(test_exit_data** exit_data, uint32_t thread_cnt, test_merge_s* merge) { test_time_metric p_start = {0}; test_time_metric p_end = {0}; test_time_metric c_start = {0}; diff --git a/bbq/tests/common/test_queue.h b/bbq/tests/common/test_queue.h index ed2800a..22bce92 100644 --- a/bbq/tests/common/test_queue.h +++ b/bbq/tests/common/test_queue.h @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-11 11:31:10 + * @LastEditTime: 2024-06-13 18:59:37 * @Email: liuyu@geedgenetworks.com * @Describe: TODO */ @@ -15,8 +15,8 @@ typedef void (*test_ring_free_f)(void *ring); typedef int (*test_ring_enqueue_f)(void *ring, void *obj); typedef int (*test_ring_dequeue_f)(void *ring, void *obj); -typedef uint32_t (*test_enqueue_burst_f)(void *ring, void *obj_table, size_t n, unsigned int flags, uint16_t thread_idx); -typedef uint32_t (*test_dequeue_burst_f)(void *ring, void *obj_table, uint32_t n); +typedef uint32_t (*test_enqueue_burst_f)(void *ring, void *obj_table, uint32_t n, uint16_t thread_idx); +typedef uint32_t (*test_dequeue_burst_f)(void *ring, void **obj_table, uint32_t n); typedef bool (*test_ring_empty_f)(void *ring); typedef struct { diff --git a/bbq/tests/unittest/ut_data.cc b/bbq/tests/unittest/ut_data.cc index 81a17da..74ed0e2 100644 --- a/bbq/tests/unittest/ut_data.cc +++ b/bbq/tests/unittest/ut_data.cc @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-11 11:31:52 + * @LastEditTime: 2024-06-13 23:25:39 * @Email: liuyu@geedgenetworks.com * @Describe: TODO */ @@ -13,6 +13,7 @@ extern "C" { extern bool bbq_malloc_free_equal(); extern bool test_malloc_free_equal(); extern void bbq_memory_info(); +bool bbq_check_array_bounds(bbq_queue_s* q); } TEST(data, correct) { @@ -25,12 +26,12 @@ TEST(data, correct) { }, .ring = { .ring_type = TEST_RING_TYPE_BBQ, - .producer_cnt = 5, - .consumer_cnt = 5, + .producer_cnt = 1, + .consumer_cnt = 1, .workload = TEST_WORKLOAD_SIMPLE, .entries_cnt = 4096, .block_count = 0, - .burst_cnt = 1, + .burst_cnt = 4, }, .run = { .run_ok_times = 50000, @@ -46,12 +47,12 @@ TEST(data, correct) { ASSERT_TRUE(ret == 0); // 创建线程 - pthread_t *threads = test_threads_create(&test_info, &q); + pthread_t* threads = test_threads_create(&test_info, &q); ASSERT_TRUE(threads); // 等待所有线程完成,回收数据 uint32_t thread_cnt = test_info.cfg.ring.producer_cnt + test_info.cfg.ring.consumer_cnt; - test_exit_data **exit_data = (test_exit_data **)test_malloc(TEST_MODULE_UTEST, sizeof(test_exit_data **) * (thread_cnt)); + test_exit_data** exit_data = (test_exit_data**)test_malloc(TEST_MODULE_UTEST, sizeof(test_exit_data**) * (thread_cnt)); test_wait_all_threads_exit(&test_info, thread_cnt, threads, exit_data); // 比较数据 @@ -66,7 +67,8 @@ TEST(data, correct) { } test_free(TEST_MODULE_UTEST, exit_data); test_threads_destory(&test_info, threads); + EXPECT_TRUE(bbq_check_array_bounds((bbq_queue_s*)q.ring)); test_queue_destory(&q); EXPECT_TRUE(bbq_malloc_free_equal()); EXPECT_TRUE(test_malloc_free_equal()); -} \ No newline at end of file +} diff --git a/bbq/tests/unittest/ut_example.cc b/bbq/tests/unittest/ut_example.cc index 049d41f..943aeb8 100644 --- a/bbq/tests/unittest/ut_example.cc +++ b/bbq/tests/unittest/ut_example.cc @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-05 17:21:25 + * @LastEditTime: 2024-06-13 23:28:23 * @Email: liuyu@geedgenetworks.com * @Describe: TODO */ @@ -18,77 +18,7 @@ extern "C" { #include "ut.h" extern bool bbq_malloc_free_equal(); extern void bbq_memory_info(); -} - -TEST(burst, table_cp_value) { - test_memory_counter_clear(); - - uint32_t ret1 = 0; - uint32_t ret2 = 0; - uint32_t ret3 = 0; - bbq_queue_s *q; - uint32_t buf_cnt = 4096; - uint32_t cnt1 = 2048; - uint32_t cnt2 = 2048; - uint32_t cnt3 = 128; - - // 创建测试数据 - uint64_t enq_table1[cnt1]; - uint64_t enq_table2[cnt2]; - uint64_t enq_table3[cnt3]; - memset(enq_table1, 1, sizeof(enq_table1)); - memset(enq_table2, 1, sizeof(enq_table2)); - memset(enq_table3, 1, sizeof(enq_table3)); - - uint64_t deq_table1[cnt1]; - uint64_t deq_table2[cnt2]; - uint64_t deq_table3[cnt3]; - - // 创建队列 - q = bbq_ring_create(buf_cnt, sizeof(uint64_t), BBQ_F_COPY_VALUE | BBQ_F_POLICY_RETRY_NEW); - EXPECT_TRUE(q); - - // 批量入队 - ret1 = bbq_enqueue_burst(q, (void *)enq_table1, cnt1, BBQ_F_ARRAY_1D); - EXPECT_EQ(ret1, cnt1); - ret3 = bbq_enqueue_burst(q, (void *)enq_table3, cnt3, BBQ_F_ARRAY_1D); - EXPECT_EQ(ret3, cnt3); - // 队列已满,实际入队phead), idx) << "line: " << line; EXPECT_EQ(bbq_head_vsn(q, q->phead), vsn) << "line: " << line; } -void expect_chead(bbq_queue_s *q, uint64_t idx, uint64_t vsn, int line) { +void expect_chead(bbq_queue_s* q, uint64_t idx, uint64_t vsn, int line) { EXPECT_EQ(bbq_idx(q, q->chead), idx) << "line: " << line; EXPECT_EQ(bbq_head_vsn(q, q->chead), vsn) << "line: " << line; } -void expect_eq_allocated(bbq_queue_s *q, bbq_block_s *block, uint64_t off, uint64_t vsn, int line) { +void expect_eq_allocated(bbq_queue_s* q, bbq_block_s* block, uint64_t off, uint64_t vsn, int line) { EXPECT_EQ(bbq_off(q, block->allocated), off) << "line: " << line; EXPECT_EQ(bbq_cur_vsn(q, block->allocated), vsn) << "line: " << line; } -void expect_eq_committed(bbq_queue_s *q, bbq_block_s *block, uint64_t off, uint64_t vsn, int line) { +void expect_eq_committed(bbq_queue_s* q, bbq_block_s* block, uint64_t off, uint64_t vsn, int line) { EXPECT_EQ(bbq_off(q, block->committed), off) << "line: " << line; EXPECT_EQ(bbq_cur_vsn(q, block->committed), vsn) << "line: " << line; } -void expect_eq_consumed(bbq_queue_s *q, bbq_block_s *block, uint64_t off, uint64_t vsn, int line) { +void expect_eq_consumed(bbq_queue_s* q, bbq_block_s* block, uint64_t off, uint64_t vsn, int line) { EXPECT_EQ(bbq_off(q, block->consumed), off) << "line: " << line; EXPECT_EQ(bbq_cur_vsn(q, block->consumed), vsn) << "line: " << line; } -void expect_eq_reserved(bbq_queue_s *q, bbq_block_s *block, uint64_t off, uint64_t vsn, int line) { +void expect_eq_reserved(bbq_queue_s* q, bbq_block_s* block, uint64_t off, uint64_t vsn, int line) { EXPECT_EQ(bbq_off(q, block->reserved), off) << "line: " << line; EXPECT_EQ(bbq_cur_vsn(q, block->reserved), vsn) << "line: " << line; } @@ -47,7 +48,7 @@ TEST(head_cursor, init) { test_memory_counter_clear(); int ret = 0; - bbq_queue_s *q; + bbq_queue_s* q; uint32_t bn = 2; uint32_t bs = 4; int enqueue_data = TEST_DATA_MAGIC; @@ -69,7 +70,7 @@ TEST(head_cursor, init) { expect_eq_reserved(q, &q->blocks[i], bs, 0, __LINE__); expect_eq_consumed(q, &q->blocks[i], bs, 0, __LINE__); } - + EXPECT_TRUE(bbq_check_array_bounds(q)); bbq_ring_free(q); EXPECT_TRUE(bbq_malloc_free_equal()); EXPECT_TRUE(test_malloc_free_equal()); @@ -77,7 +78,7 @@ TEST(head_cursor, init) { void ut_produce_something(uint32_t produce_cnt) { int ret = 0; - bbq_queue_s *q; + bbq_queue_s* q; uint32_t bn = 8; uint32_t bs = 4096; int enqueue_data = TEST_DATA_MAGIC; @@ -122,7 +123,7 @@ void ut_produce_something(uint32_t produce_cnt) { expect_eq_reserved(q, &q->blocks[i], bs, 0, __LINE__); expect_eq_consumed(q, &q->blocks[i], bs, 0, __LINE__); } - + EXPECT_TRUE(bbq_check_array_bounds(q)); bbq_ring_free(q); } // 在第一块内生产,然后被消费完 @@ -139,7 +140,7 @@ TEST(head_cursor, produce_something) { void ut_produce_next_block(uint32_t over) { int ret = 0; - bbq_queue_s *q; + bbq_queue_s* q; uint32_t bn = 8; uint32_t bs = 4096; uint32_t produce_cnt = bs + over; @@ -189,6 +190,7 @@ void ut_produce_next_block(uint32_t over) { expect_eq_reserved(q, &q->blocks[1], over, 1, __LINE__); expect_eq_consumed(q, &q->blocks[1], over, 1, __LINE__); + EXPECT_TRUE(bbq_check_array_bounds(q)); bbq_ring_free(q); } @@ -206,7 +208,7 @@ TEST(head_cursor, produce_next_block) { void ut_produce_all_loop(uint32_t loop) { int ret = 0; - bbq_queue_s *q; + bbq_queue_s* q; uint32_t bn = 8; uint32_t bs = 4096; uint32_t produce_cnt = bn * bs; @@ -245,6 +247,9 @@ void ut_produce_all_loop(uint32_t loop) { expect_eq_reserved(q, &q->blocks[i], bs, loop, __LINE__); expect_eq_consumed(q, &q->blocks[i], bs, loop, __LINE__); } + + EXPECT_TRUE(bbq_check_array_bounds(q)); + bbq_ring_free(q); } // 完成多轮的满生产和满消费 @@ -264,15 +269,17 @@ TEST(boundary, retry_new_full_empty) { int ret = 0; uint32_t entries_cnt = 4096; uint32_t loop = 1000; - bbq_queue_s *q; + bbq_queue_s* q; - int *data = (int *)test_malloc(TEST_MODULE_UTEST, sizeof(*data) * entries_cnt); + int* data = (int*)test_malloc(TEST_MODULE_UTEST, sizeof(*data) * entries_cnt); int tmp_data = 0; EXPECT_TRUE(data); q = bbq_ring_create(entries_cnt, sizeof(int), BBQ_F_COPY_VALUE | BBQ_F_POLICY_RETRY_NEW); EXPECT_TRUE(q); + bool test = bbq_check_array_bounds(q); + for (uint32_t i = 0; i < loop; i++) { // 入满队 for (uint32_t j = 0; j < entries_cnt; j++) { @@ -320,6 +327,7 @@ TEST(boundary, retry_new_full_empty) { } test_free(TEST_MODULE_UTEST, data); + EXPECT_TRUE(bbq_check_array_bounds(q)); bbq_ring_free(q); EXPECT_TRUE(bbq_malloc_free_equal()); EXPECT_TRUE(test_malloc_free_equal()); @@ -356,12 +364,12 @@ TEST(boundary, mpsc_faa) { ASSERT_TRUE(ret == 0); // 创建线程 - pthread_t *threads = test_threads_create(&test_info, &q); + pthread_t* threads = test_threads_create(&test_info, &q); ASSERT_TRUE(threads); // 等待所有线程完成,回收数据 uint32_t thread_cnt = test_info.cfg.ring.producer_cnt + test_info.cfg.ring.consumer_cnt; - test_exit_data **exit_data = (test_exit_data **)test_malloc(TEST_MODULE_UTEST, sizeof(test_exit_data **) * (thread_cnt)); + test_exit_data** exit_data = (test_exit_data**)test_malloc(TEST_MODULE_UTEST, sizeof(test_exit_data**) * (thread_cnt)); uint32_t i = 0; test_wait_all_threads_exit(&test_info, thread_cnt, threads, exit_data); @@ -378,17 +386,18 @@ TEST(boundary, mpsc_faa) { } test_free(TEST_MODULE_UTEST, exit_data); test_threads_destory(&test_info, threads); + EXPECT_TRUE(bbq_check_array_bounds((bbq_queue_s*)q.ring)); test_queue_destory(&q); EXPECT_TRUE(bbq_malloc_free_equal()); EXPECT_TRUE(test_malloc_free_equal()); } -void debug_head_print(bbq_queue_s *q) { +void debug_head_print(bbq_queue_s* q) { printf("phead vsn:%d idx:%d\n", bbq_head_vsn(q, q->phead), bbq_idx(q, q->phead)); printf("chead vsn:%d idx:%d\n", bbq_head_vsn(q, q->chead), bbq_idx(q, q->chead)); } -void debug_block_print(bbq_queue_s *q) { +void debug_block_print(bbq_queue_s* q) { for (int i = 0; i < q->bn; i++) { printf("[%d]zzzz allocated:vsn:%d off:%d\n", i, bbq_cur_vsn(q, q->blocks[i].allocated.load()), bbq_off(q, q->blocks[i].allocated.load())); printf("[%d]zzzz committed:%d off:%d\n", i, bbq_cur_vsn(q, q->blocks[i].committed.load()), bbq_off(q, q->blocks[i].committed.load())); @@ -404,7 +413,7 @@ TEST(boundary, drop_old_full_empty1) { uint32_t bs = 4; uint32_t over_cnt = 3; uint32_t loop = 1000; - bbq_queue_s *q; + bbq_queue_s* q; int tmp_data = 0; q = bbq_ring_create_bnbs(bn, bs, sizeof(int), BBQ_F_COPY_VALUE | BBQ_F_POLICY_DROP_OLD); @@ -439,7 +448,7 @@ TEST(boundary, drop_old_full_empty1) { EXPECT_EQ(q->blocks[i].consumed.load(), 0); } } - + EXPECT_TRUE(bbq_check_array_bounds(q)); bbq_ring_free(q); EXPECT_TRUE(bbq_malloc_free_equal()); @@ -453,7 +462,7 @@ TEST(boundary, drop_old_full_empty2) { uint32_t bs = 4; uint32_t loop = 1000; uint32_t over_cnt = bs + 2; - bbq_queue_s *q; + bbq_queue_s* q; EXPECT_EQ(over_cnt / bs, 1); @@ -511,7 +520,7 @@ TEST(boundary, drop_old_full_empty2) { i == 1 ? loop + 1 : 0, __LINE__); EXPECT_EQ(q->blocks[i].consumed.load(), 0); } - + EXPECT_TRUE(bbq_check_array_bounds(q)); bbq_ring_free(q); EXPECT_TRUE(bbq_malloc_free_equal()); EXPECT_TRUE(test_malloc_free_equal()); diff --git a/bbq/tests/unittest/ut_mix.cc b/bbq/tests/unittest/ut_mix.cc index e2b53e8..b279234 100644 --- a/bbq/tests/unittest/ut_mix.cc +++ b/bbq/tests/unittest/ut_mix.cc @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-06 23:54:27 + * @LastEditTime: 2024-06-13 23:13:00 * @Email: liuyu@geedgenetworks.com * @Describe: bbq除了队列操作外,其他函数的测试 */ diff --git a/perf/benchmark/bcm_benchmark.c b/perf/benchmark/bcm_benchmark.c index 7298bb7..6cbbdfa 100644 --- a/perf/benchmark/bcm_benchmark.c +++ b/perf/benchmark/bcm_benchmark.c @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-11 11:32:43 + * @LastEditTime: 2024-06-13 16:32:40 * @Email: liuyu@geedgenetworks.com * @Describe: TODO */ diff --git a/perf/benchmark/bcm_queue.c b/perf/benchmark/bcm_queue.c index 98b0d29..9eec4f2 100644 --- a/perf/benchmark/bcm_queue.c +++ b/perf/benchmark/bcm_queue.c @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-06 17:36:40 + * @LastEditTime: 2024-06-13 18:58:20 * @Email: liuyu@geedgenetworks.com * @Describe: TODO */ @@ -8,12 +8,12 @@ #include "ringbuf.h" static __rte_always_inline unsigned int -bcm_ring_enqueue_burst(struct rte_ring *r, void *obj_table, size_t n, unsigned int flags) { +bcm_dpdk_ring_enqueue_burst(struct rte_ring *r, void **obj_table, size_t n, uint16_t thread_idx) { return rte_ring_enqueue_burst(r, (void *const *)obj_table, n, NULL); } static __rte_always_inline unsigned int -bcm_ring_dequeue_burst(struct rte_ring *r, void *obj_table, unsigned int n) { +bcm_dpdk_ring_dequeue_burst(struct rte_ring *r, void *obj_table, unsigned int n) { return rte_ring_dequeue_burst(r, (void **)obj_table, n, NULL); } @@ -47,8 +47,8 @@ int test_queue_init_dpdk(test_cfg *cfg, test_queue_s *q) { q->ring_free_f = (test_ring_free_f)rte_ring_free; q->enqueue_f = (test_ring_enqueue_f)rte_ring_enqueue; q->dequeue_f = (test_ring_dequeue_f)rte_ring_dequeue; - q->enqueue_burst_f = (test_enqueue_burst_f)bcm_ring_enqueue_burst; - q->dequeue_burst_f = (test_dequeue_burst_f)bcm_ring_dequeue_burst; + q->enqueue_burst_f = (test_enqueue_burst_f)bcm_dpdk_ring_enqueue_burst; + q->dequeue_burst_f = (test_dequeue_burst_f)bcm_dpdk_ring_dequeue_burst; return BBQ_OK; } @@ -67,7 +67,7 @@ void test_queue_free_rmind(void *ring) { test_free(TEST_MODULE_RMIND, ring); } -uint32_t test_enqueue_burst_rmind(void *ring, void *obj_table, size_t n, unsigned int flags, uint16_t thread_idx) { +uint32_t test_enqueue_burst_rmind(void *ring, void **obj_table, size_t n, unsigned int flags, uint16_t thread_idx) { uint32_t cnt = 0; int ret = 0; size_t off = 0; @@ -76,7 +76,7 @@ uint32_t test_enqueue_burst_rmind(void *ring, void *obj_table, size_t n, unsigne size_t len = sizeof(uintptr_t); for (cnt = 0; cnt < n; cnt++) { - obj = ((void **)obj_table)[cnt]; + obj = obj_table[cnt]; uintptr_t uptr = (uintptr_t)obj; if ((ret = ringbuf_acquire(ring, w, len)) != -1) { diff --git a/perf/benchmark/benchmark.sh b/perf/benchmark/benchmark.sh index 4151dfb..272b6d6 100755 --- a/perf/benchmark/benchmark.sh +++ b/perf/benchmark/benchmark.sh @@ -1,18 +1,17 @@ #!/bin/bash ### # @Author: liuyu -# @LastEditTime: 2024-06-06 22:04:41 +# @LastEditTime: 2024-06-14 10:19:30 # @Email: liuyu@geedgenetworks.com # @Describe: TODO ### ring_type_arr=("bbq" "dpdk" "rmind") +burst_arr=("32" "16" "8" "1") # 检查参数数量 -if [ "$#" -ne 4 ]; then - echo "Usage: $0 " - echo " is one of: ${my_array[*]} or input all" # 列出可选的 ring_type - echo " : The number of obj enqueue or dequeue at a time" # 列出可选的 ring_type +if [ "$#" -ne 3 ]; then + echo "Usage: $0 " exit 1 fi @@ -20,16 +19,11 @@ fi BENCHMARK_PATH=$1 CONFIG_DIR=$2 RING_TYPE=$3 -BURST_CNT=$4 - -if [ "$BURST_CNT" -le 0 ]; then - echo "burst_cnt need >=1" - exit 1 -fi function exec_benchmark_ring_type() { local ini="$1" local ring="$2" + local log_file=$3 # 如果以perf开头的配置文件,还要执行perf统计 if [[ $(basename "$ini") == perf* ]]; then @@ -37,13 +31,15 @@ function exec_benchmark_ring_type() { # echo perf stat -a -e L1-dcache-loads,L1-dcache-load-misses,cache-references,cache-misses "$BENCHMARK_PATH" "$ini" "$ring" "$BURST_CNT" # perf stat -a -e L1-dcache-loads,L1-dcache-load-misses,cache-references,cache-misses "$BENCHMARK_PATH" "$ini" "$ring" "$BURST_CNT" else - "$BENCHMARK_PATH" "$ini" "$ring" "$BURST_CNT" + "$BENCHMARK_PATH" "$ini" "$ring" "$BURST_CNT" 2>&1 | tee -a "$log_file" fi } function exec_benchmark() { # 提取配置文件名(不带路径) - INI_FILE=$1 + local INI_FILE=$1 + local log_file=$2 + local burst=$3 # 执行benchmark命令并传递配置文件作为参数 echo "Executing benchmark with $INI_FILE" @@ -51,11 +47,11 @@ function exec_benchmark() { # 使用所有ring_type if [ "$RING_TYPE" == "all" ]; then for ring_type_tmp in "${ring_type_arr[@]}"; do - exec_benchmark_ring_type "$INI_FILE" "$ring_type_tmp" - echo "--------------------------------------------------------------" + echo start ring:"$ring_type_tmp" + exec_benchmark_ring_type "$INI_FILE" "$ring_type_tmp" "$log_file" "$burst" done else - exec_benchmark_ring_type "$INI_FILE" "$RING_TYPE" + exec_benchmark_ring_type "$INI_FILE" "$RING_TYPE" "$log_file" "$burst" fi } @@ -71,16 +67,31 @@ if [ ! -e "$CONFIG_DIR" ]; then exit 1 fi -if [[ -f "$CONFIG_DIR" ]]; then - # 如果是文件,直接执行 - exec_benchmark $CONFIG_DIR -else - # 使用 find 命令递归地搜索所有的 .ini 文件,并按文件名排序 - find "$CONFIG_DIR" -type f -name "*.ini" -print0 | sort -z | while IFS= read -r -d '' INI_FILE; do - if [ -f "$INI_FILE" ]; then - exec_benchmark $INI_FILE +# 创建报告目录 +timestamp=$(date +"%Y%m%d_%H%M%S") +folder_path="/tmp/bbq/$timestamp" +rm -rf "$folder_path" +mkdir -p "$folder_path" + +for burst in "${burst_arr[@]}"; do + burst_dir="$folder_path"/burst_"$burst" + mkdir -p "$burst_dir" + for i in {1..3}; do + echo ======"$i"======== + report_path="$burst_dir"/report_"$i".txt + if [[ -f "$CONFIG_DIR" ]]; then + # 如果是文件,直接执行 + exec_benchmark "$CONFIG_DIR" "$report_path" "$burst" + else + # 使用 find 命令递归地搜索所有的 .ini 文件,并按文件名排序 + find "$CONFIG_DIR" -type f -name "*.ini" -print0 | sort -z | while IFS= read -r -d '' INI_FILE; do + if [ -f "$INI_FILE" ]; then + exec_benchmark "$INI_FILE" "$report_path" "$burst" + fi + done fi - done -fi + sleep 1 + done +done echo "done......." -- cgit v1.2.3