diff options
Diffstat (limited to 'bbq/src/bbq.c')
| -rw-r--r-- | bbq/src/bbq.c | 153 |
1 files changed, 66 insertions, 87 deletions
diff --git a/bbq/src/bbq.c b/bbq/src/bbq.c index 4ad7aba..882ba67 100644 --- a/bbq/src/bbq.c +++ b/bbq/src/bbq.c @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-07-01 03:04:17 + * @LastEditTime: 2024-07-01 04:05:34 * @Email: [email protected] * @Describe: bbq(Block-based Bounded Queue)实现 * 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf @@ -109,57 +109,35 @@ static inline uint64_t bbq_atomic64_fetch_add(struct bbq_atomic64 *atomic, uint6 } // 当BBQ_MEMORY宏定义开关打开,将对内存分配释放进行统计,方便排查内存泄漏 -enum bbq_module { - BBQ_MODULE_MAIN = 0, - BBQ_MODULE_BLOCK_NB, - BBQ_MODULE_BLOCK_ENTRY, - BBQ_MODULE_MAX, -}; #ifdef BBQ_MEMORY #define BBQ_MEM_MAGIC 0xFF struct bbq_memory_s { - aotmic_uint64 malloc_cnt; - aotmic_uint64 malloc_size; - aotmic_uint64 free_cnt; - aotmic_uint64 free_size; + uint64_t malloc_cnt; + uint64_t malloc_size; + uint64_t free_cnt; + uint64_t free_size; }; -struct bbq_memory_s bbq_memory_g[BBQ_MODULE_MAX] = {0}; +struct bbq_memory_s bbq_memory_g = {0}; #endif -static void *bbq_malloc(enum bbq_module module, int socket_id, size_t size) { - void *ptr = NULL; - if (socket_id >= 0) { - ptr = numa_alloc_onnode(size, 0); - } else { - ptr = malloc(size); - } +void *bbq_malloc_def_callback(int32_t socket_id __attribute__((unused)), size_t size) { #ifdef BBQ_MEMORY - if (ptr != NULL) { - atomic_fetch_add(&bbq_memory_g[module].malloc_cnt, 1); - atomic_fetch_add(&bbq_memory_g[module].malloc_size, size); - } -#else - AVOID_WARNING(module); + bbq_memory_g.malloc_cnt++; + bbq_memory_g.malloc_size += size; #endif - return ptr; + return malloc(size); } -static void bbq_free(enum bbq_module module, int socket_id, void *ptr, size_t size) { - if (socket_id >= 0) { - numa_free(ptr, size); - } else { - free(ptr); - } - +void bbq_free_def_callback(void *ptr, + size_t size __attribute__((unused))) { #ifdef BBQ_MEMORY - if (ptr != NULL) { - atomic_fetch_add(&bbq_memory_g[module].free_cnt, 1); - atomic_fetch_add(&bbq_memory_g[module].free_size, size); + if (ptr) { + bbq_memory_g.free_cnt++; + bbq_memory_g.free_size += size; } -#else - AVOID_WARNING(module); #endif + free(ptr); } /* 原子的比较两个值大小,并设置较大的值,成功则返回设置前的旧值 */ @@ -225,14 +203,13 @@ int block_init(struct bbq *q, struct bbq_block *block, bool cursor_init, uint32_ #ifdef BBQ_MEMORY // 末尾多分配一个entry,它永远不应该被修改,以此检查是否存在写越界的问题 size = (q->bs + 1) * q->entry_size; - block->entries = bbq_malloc(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, size); + block->entries = q->malloc_f(q->socket_id, size); char *last_entry = block->entries + q->entry_size * q->bs; memset(block->entries, 0, size); memset(last_entry, BBQ_MEM_MAGIC, q->entry_size); #else size = q->bs * q->entry_size; - block->entries = bbq_malloc(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, - q->bs * q->entry_size); + block->entries = q->malloc_f(q->socket_id, q->bs * q->entry_size); memset(block->entries, 0, size); #endif @@ -268,11 +245,9 @@ int block_init(struct bbq *q, struct bbq_block *block, bool cursor_init, uint32_ void block_destory(struct bbq *q, struct bbq_block *block) { if (block->entries) { #ifdef BBQ_MEMORY - bbq_free(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, - block->entries, (q->bs + 1) * q->entry_size); + q->free_f(block->entries, (q->bs + 1) * q->entry_size); #else - bbq_free(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, - block->entries, q->bs * q->entry_size); + q->free_f(block->entries, q->bs * q->entry_size); #endif block->entries = NULL; } @@ -295,7 +270,9 @@ static unsigned bbq_ceil_log2(uint64_t x) { } /* 创建消息队列,bn和bs必须是2的N次幂,socket_id用于多numa分配内存 */ -static struct bbq *__bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags) { +static struct bbq *__bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs, + size_t obj_size, int socket_id, uint32_t flags, + bbq_malloc_f malloc_f, bbq_free_f free_f) { int ret = 0; size_t size = 0; if (bbq_check_power_of_two(bn) == false) { @@ -323,7 +300,15 @@ static struct bbq *__bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs, socket_id = BBQ_SOCKET_ID_ANY; } - struct bbq *q = bbq_malloc(BBQ_MODULE_MAIN, socket_id, sizeof(*q)); + if (malloc_f == NULL) { + malloc_f = bbq_malloc_def_callback; + } + + if (free_f == NULL) { + free_f = bbq_free_def_callback; + } + + struct bbq *q = malloc_f(socket_id, sizeof(*q)); if (q == NULL) { bbq_errno = BBQ_ERR_ALLOC; return NULL; @@ -341,9 +326,11 @@ static struct bbq *__bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs, q->chead.single = true; } q->flags = flags; + q->malloc_f = malloc_f; + q->free_f = free_f; size = bn * sizeof(*q->blocks); - q->blocks = bbq_malloc(BBQ_MODULE_BLOCK_NB, socket_id, size); + q->blocks = q->malloc_f(socket_id, size); if (q->blocks == NULL) { bbq_errno = BBQ_ERR_ALLOC; goto error; @@ -375,19 +362,25 @@ error: } /* 使用自定义的bn、bs创建指针入队的bbq,一般用于单元测试 */ -struct bbq *bbq_create_with_bnbs(const char *name, uint32_t bn, uint32_t bs, int socket_id, uint32_t flags) { +struct bbq *bbq_create_with_bnbs(const char *name, uint32_t bn, uint32_t bs, + int socket_id, uint32_t flags, + bbq_malloc_f malloc_f, bbq_free_f free_f) { bbq_errno = BBQ_OK; - return __bbq_create_bnbs(name, bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR); + return __bbq_create_bnbs(name, bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR, malloc_f, free_f); } /* 使用自定义的bn、bs创建值入队的bbq,一般用于单元测试 */ -struct bbq *bbq_create_elem_with_bnbs(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags) { +struct bbq *bbq_create_elem_with_bnbs(const char *name, uint32_t bn, uint32_t bs, + size_t obj_size, int socket_id, uint32_t flags, + bbq_malloc_f malloc_f, bbq_free_f free_f) { bbq_errno = BBQ_OK; - return __bbq_create_bnbs(name, bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE); + return __bbq_create_bnbs(name, bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE, malloc_f, free_f); } /* 创建消息队列,count必须大于1,且是2的N次幂,bn和bs将根据count值自动计算,socket_id用于多numa分配内存,free_func先设置NULL */ -struct bbq *bbq_create_elem(const char *name, uint32_t count, size_t obj_size, int socket_id, uint32_t flags) { +struct bbq *bbq_create_elem(const char *name, uint32_t count, size_t obj_size, + int socket_id, uint32_t flags, + bbq_malloc_f malloc_f, bbq_free_f free_f) { bbq_errno = BBQ_OK; uint32_t bn = 0; uint32_t bs = 0; @@ -396,10 +389,11 @@ struct bbq *bbq_create_elem(const char *name, uint32_t count, size_t obj_size, i return NULL; } - return __bbq_create_bnbs(name, bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE); + return __bbq_create_bnbs(name, bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE, malloc_f, free_f); } -struct bbq *bbq_create(const char *name, uint32_t count, int socket_id, uint32_t flags) { +struct bbq *bbq_create(const char *name, uint32_t count, int socket_id, uint32_t flags, + bbq_malloc_f malloc_f, bbq_free_f free_f) { bbq_errno = BBQ_OK; uint32_t bn = 0; uint32_t bs = 0; @@ -408,7 +402,7 @@ struct bbq *bbq_create(const char *name, uint32_t count, int socket_id, uint32_t return NULL; } - return __bbq_create_bnbs(name, bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR); + return __bbq_create_bnbs(name, bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR, malloc_f, free_f); } /* 释放消息队列,与bbq_ring_create系列接口成对*/ @@ -421,8 +415,8 @@ void bbq_destory(struct bbq *q) { block_destory(q, &(q->blocks[i])); } - bbq_free(BBQ_MODULE_BLOCK_NB, q->socket_id, q->blocks, q->bn * sizeof(*q->blocks)); - bbq_free(BBQ_MODULE_MAIN, q->socket_id, q, sizeof(*q)); + q->free_f(q->blocks, q->bn * sizeof(*q->blocks)); + q->free_f(q, sizeof(*q)); } #define BBQ_DATA_TYPE_SINGLE 0x0 @@ -479,7 +473,6 @@ void bbq_commit_entry(struct bbq *q, struct bbq_entry_desc *e, void const *data, struct bbq_queue_state_s bbq_allocate_entry(struct bbq *q, uint64_t ph, uint32_t n) { struct bbq_queue_state_s state = {0}; uint64_t ph_idx = bbq_head_idx(q, ph); - uint64_t ph_ver = bbq_head_vsn(q, ph); struct bbq_block *block = &(q->blocks[ph_idx]); if (bbq_cur_off(q, bbq_atomic64_load(&block->allocated)) >= q->bs) { @@ -491,6 +484,7 @@ struct bbq_queue_state_s bbq_allocate_entry(struct bbq *q, uint64_t ph, uint32_t uint64_t cur_vsn = bbq_cur_vsn(q, old); uint64_t cur_off = bbq_cur_off(q, old); #ifdef TEST_PERF_1 + // uint64_t ph_ver = bbq_head_vsn(q, ph); // if ((cur_off >= q->bs) || bbq_cursor_overflow(ph_idx, ph_ver, cur_vsn)) { // state.state = BBQ_BLOCK_DONE; // return state; @@ -1100,26 +1094,19 @@ uint32_t bbq_dequeue_burst_elem(struct bbq *q, void *obj_table, uint32_t n, uint bool bbq_malloc_free_equal() { #ifdef BBQ_MEMORY - bool ret = true; - for (int i = 0; i < BBQ_MODULE_MAX; i++) { - uint64_t malloc_cnt = atomic_load(&bbq_memory_g[i].malloc_cnt); - uint64_t free_cnt = atomic_load(&bbq_memory_g[i].free_cnt); - if (malloc_cnt != free_cnt) { - BBQ_ERR_LOG("[module:%d] malloc:%lu free:%lu, bbq mmalloc-free count not equal\n", i, malloc_cnt, free_cnt); - ret = false; - } + if (bbq_memory_g.malloc_cnt != bbq_memory_g.free_cnt) { + BBQ_ERR_LOG("malloc:%lu free:%lu, bbq mmalloc-free count not equal\n", + bbq_memory_g.malloc_cnt, bbq_memory_g.free_cnt); + return false; + } - uint64_t malloc_size = atomic_load(&bbq_memory_g[i].malloc_size); - uint64_t free_size = atomic_load(&bbq_memory_g[i].free_size); - if (malloc_size != free_size) { - BBQ_ERR_LOG("[module:%d] malloc:%lu byte free:%lu byte, bbq mmalloc-free size not equal\n", i, malloc_size, free_size); - ret = false; - } + if (bbq_memory_g.malloc_size != bbq_memory_g.free_size) { + BBQ_ERR_LOG("malloc:%lu byte free:%lu byte, bbq mmalloc-free size not equal\n", + bbq_memory_g.malloc_size, bbq_memory_g.free_size); + return false; } - return ret; -#else - return true; #endif + return true; } bool bbq_debug_check_array_bounds(struct bbq *q) { @@ -1142,17 +1129,9 @@ bool bbq_debug_check_array_bounds(struct bbq *q) { void bbq_debug_memory_print() { #ifdef BBQ_MEMORY - for (int i = 0; i < BBQ_MODULE_MAX; i++) { - uint64_t malloc_cnt = atomic_load(&bbq_memory_g[i].malloc_cnt); - uint64_t free_cnt = atomic_load(&bbq_memory_g[i].free_cnt); - if (malloc_cnt == 0 && free_cnt == 0) { - continue; - } - - printf("[%d]bbq malloc:%lu free:%lu\n", i, - atomic_load(&bbq_memory_g[i].malloc_cnt), - atomic_load(&bbq_memory_g[i].free_cnt)); - } + printf("bbq malloc:%lu free:%lu\n", + bbq_memory_g.malloc_cnt, + bbq_memory_g.free_cnt); if (bbq_malloc_free_equal()) { printf("all memory free\n"); |
