summaryrefslogtreecommitdiff
path: root/bbq/src/bbq.c
diff options
context:
space:
mode:
Diffstat (limited to 'bbq/src/bbq.c')
-rw-r--r--bbq/src/bbq.c153
1 files changed, 66 insertions, 87 deletions
diff --git a/bbq/src/bbq.c b/bbq/src/bbq.c
index 4ad7aba..882ba67 100644
--- a/bbq/src/bbq.c
+++ b/bbq/src/bbq.c
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-07-01 03:04:17
+ * @LastEditTime: 2024-07-01 04:05:34
* @Describe: bbq(Block-based Bounded Queue)实现
* 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf
@@ -109,57 +109,35 @@ static inline uint64_t bbq_atomic64_fetch_add(struct bbq_atomic64 *atomic, uint6
}
// 当BBQ_MEMORY宏定义开关打开,将对内存分配释放进行统计,方便排查内存泄漏
-enum bbq_module {
- BBQ_MODULE_MAIN = 0,
- BBQ_MODULE_BLOCK_NB,
- BBQ_MODULE_BLOCK_ENTRY,
- BBQ_MODULE_MAX,
-};
#ifdef BBQ_MEMORY
#define BBQ_MEM_MAGIC 0xFF
struct bbq_memory_s {
- aotmic_uint64 malloc_cnt;
- aotmic_uint64 malloc_size;
- aotmic_uint64 free_cnt;
- aotmic_uint64 free_size;
+ uint64_t malloc_cnt;
+ uint64_t malloc_size;
+ uint64_t free_cnt;
+ uint64_t free_size;
};
-struct bbq_memory_s bbq_memory_g[BBQ_MODULE_MAX] = {0};
+struct bbq_memory_s bbq_memory_g = {0};
#endif
-static void *bbq_malloc(enum bbq_module module, int socket_id, size_t size) {
- void *ptr = NULL;
- if (socket_id >= 0) {
- ptr = numa_alloc_onnode(size, 0);
- } else {
- ptr = malloc(size);
- }
+void *bbq_malloc_def_callback(int32_t socket_id __attribute__((unused)), size_t size) {
#ifdef BBQ_MEMORY
- if (ptr != NULL) {
- atomic_fetch_add(&bbq_memory_g[module].malloc_cnt, 1);
- atomic_fetch_add(&bbq_memory_g[module].malloc_size, size);
- }
-#else
- AVOID_WARNING(module);
+ bbq_memory_g.malloc_cnt++;
+ bbq_memory_g.malloc_size += size;
#endif
- return ptr;
+ return malloc(size);
}
-static void bbq_free(enum bbq_module module, int socket_id, void *ptr, size_t size) {
- if (socket_id >= 0) {
- numa_free(ptr, size);
- } else {
- free(ptr);
- }
-
+void bbq_free_def_callback(void *ptr,
+ size_t size __attribute__((unused))) {
#ifdef BBQ_MEMORY
- if (ptr != NULL) {
- atomic_fetch_add(&bbq_memory_g[module].free_cnt, 1);
- atomic_fetch_add(&bbq_memory_g[module].free_size, size);
+ if (ptr) {
+ bbq_memory_g.free_cnt++;
+ bbq_memory_g.free_size += size;
}
-#else
- AVOID_WARNING(module);
#endif
+ free(ptr);
}
/* 原子的比较两个值大小,并设置较大的值,成功则返回设置前的旧值 */
@@ -225,14 +203,13 @@ int block_init(struct bbq *q, struct bbq_block *block, bool cursor_init, uint32_
#ifdef BBQ_MEMORY
// 末尾多分配一个entry,它永远不应该被修改,以此检查是否存在写越界的问题
size = (q->bs + 1) * q->entry_size;
- block->entries = bbq_malloc(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, size);
+ block->entries = q->malloc_f(q->socket_id, size);
char *last_entry = block->entries + q->entry_size * q->bs;
memset(block->entries, 0, size);
memset(last_entry, BBQ_MEM_MAGIC, q->entry_size);
#else
size = q->bs * q->entry_size;
- block->entries = bbq_malloc(BBQ_MODULE_BLOCK_ENTRY, q->socket_id,
- q->bs * q->entry_size);
+ block->entries = q->malloc_f(q->socket_id, q->bs * q->entry_size);
memset(block->entries, 0, size);
#endif
@@ -268,11 +245,9 @@ int block_init(struct bbq *q, struct bbq_block *block, bool cursor_init, uint32_
void block_destory(struct bbq *q, struct bbq_block *block) {
if (block->entries) {
#ifdef BBQ_MEMORY
- bbq_free(BBQ_MODULE_BLOCK_ENTRY, q->socket_id,
- block->entries, (q->bs + 1) * q->entry_size);
+ q->free_f(block->entries, (q->bs + 1) * q->entry_size);
#else
- bbq_free(BBQ_MODULE_BLOCK_ENTRY, q->socket_id,
- block->entries, q->bs * q->entry_size);
+ q->free_f(block->entries, q->bs * q->entry_size);
#endif
block->entries = NULL;
}
@@ -295,7 +270,9 @@ static unsigned bbq_ceil_log2(uint64_t x) {
}
/* 创建消息队列,bn和bs必须是2的N次幂,socket_id用于多numa分配内存 */
-static struct bbq *__bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags) {
+static struct bbq *__bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs,
+ size_t obj_size, int socket_id, uint32_t flags,
+ bbq_malloc_f malloc_f, bbq_free_f free_f) {
int ret = 0;
size_t size = 0;
if (bbq_check_power_of_two(bn) == false) {
@@ -323,7 +300,15 @@ static struct bbq *__bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs,
socket_id = BBQ_SOCKET_ID_ANY;
}
- struct bbq *q = bbq_malloc(BBQ_MODULE_MAIN, socket_id, sizeof(*q));
+ if (malloc_f == NULL) {
+ malloc_f = bbq_malloc_def_callback;
+ }
+
+ if (free_f == NULL) {
+ free_f = bbq_free_def_callback;
+ }
+
+ struct bbq *q = malloc_f(socket_id, sizeof(*q));
if (q == NULL) {
bbq_errno = BBQ_ERR_ALLOC;
return NULL;
@@ -341,9 +326,11 @@ static struct bbq *__bbq_create_bnbs(const char *name, uint32_t bn, uint32_t bs,
q->chead.single = true;
}
q->flags = flags;
+ q->malloc_f = malloc_f;
+ q->free_f = free_f;
size = bn * sizeof(*q->blocks);
- q->blocks = bbq_malloc(BBQ_MODULE_BLOCK_NB, socket_id, size);
+ q->blocks = q->malloc_f(socket_id, size);
if (q->blocks == NULL) {
bbq_errno = BBQ_ERR_ALLOC;
goto error;
@@ -375,19 +362,25 @@ error:
}
/* 使用自定义的bn、bs创建指针入队的bbq,一般用于单元测试 */
-struct bbq *bbq_create_with_bnbs(const char *name, uint32_t bn, uint32_t bs, int socket_id, uint32_t flags) {
+struct bbq *bbq_create_with_bnbs(const char *name, uint32_t bn, uint32_t bs,
+ int socket_id, uint32_t flags,
+ bbq_malloc_f malloc_f, bbq_free_f free_f) {
bbq_errno = BBQ_OK;
- return __bbq_create_bnbs(name, bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR);
+ return __bbq_create_bnbs(name, bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR, malloc_f, free_f);
}
/* 使用自定义的bn、bs创建值入队的bbq,一般用于单元测试 */
-struct bbq *bbq_create_elem_with_bnbs(const char *name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags) {
+struct bbq *bbq_create_elem_with_bnbs(const char *name, uint32_t bn, uint32_t bs,
+ size_t obj_size, int socket_id, uint32_t flags,
+ bbq_malloc_f malloc_f, bbq_free_f free_f) {
bbq_errno = BBQ_OK;
- return __bbq_create_bnbs(name, bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE);
+ return __bbq_create_bnbs(name, bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE, malloc_f, free_f);
}
/* 创建消息队列,count必须大于1,且是2的N次幂,bn和bs将根据count值自动计算,socket_id用于多numa分配内存,free_func先设置NULL */
-struct bbq *bbq_create_elem(const char *name, uint32_t count, size_t obj_size, int socket_id, uint32_t flags) {
+struct bbq *bbq_create_elem(const char *name, uint32_t count, size_t obj_size,
+ int socket_id, uint32_t flags,
+ bbq_malloc_f malloc_f, bbq_free_f free_f) {
bbq_errno = BBQ_OK;
uint32_t bn = 0;
uint32_t bs = 0;
@@ -396,10 +389,11 @@ struct bbq *bbq_create_elem(const char *name, uint32_t count, size_t obj_size, i
return NULL;
}
- return __bbq_create_bnbs(name, bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE);
+ return __bbq_create_bnbs(name, bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE, malloc_f, free_f);
}
-struct bbq *bbq_create(const char *name, uint32_t count, int socket_id, uint32_t flags) {
+struct bbq *bbq_create(const char *name, uint32_t count, int socket_id, uint32_t flags,
+ bbq_malloc_f malloc_f, bbq_free_f free_f) {
bbq_errno = BBQ_OK;
uint32_t bn = 0;
uint32_t bs = 0;
@@ -408,7 +402,7 @@ struct bbq *bbq_create(const char *name, uint32_t count, int socket_id, uint32_t
return NULL;
}
- return __bbq_create_bnbs(name, bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR);
+ return __bbq_create_bnbs(name, bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR, malloc_f, free_f);
}
/* 释放消息队列,与bbq_ring_create系列接口成对*/
@@ -421,8 +415,8 @@ void bbq_destory(struct bbq *q) {
block_destory(q, &(q->blocks[i]));
}
- bbq_free(BBQ_MODULE_BLOCK_NB, q->socket_id, q->blocks, q->bn * sizeof(*q->blocks));
- bbq_free(BBQ_MODULE_MAIN, q->socket_id, q, sizeof(*q));
+ q->free_f(q->blocks, q->bn * sizeof(*q->blocks));
+ q->free_f(q, sizeof(*q));
}
#define BBQ_DATA_TYPE_SINGLE 0x0
@@ -479,7 +473,6 @@ void bbq_commit_entry(struct bbq *q, struct bbq_entry_desc *e, void const *data,
struct bbq_queue_state_s bbq_allocate_entry(struct bbq *q, uint64_t ph, uint32_t n) {
struct bbq_queue_state_s state = {0};
uint64_t ph_idx = bbq_head_idx(q, ph);
- uint64_t ph_ver = bbq_head_vsn(q, ph);
struct bbq_block *block = &(q->blocks[ph_idx]);
if (bbq_cur_off(q, bbq_atomic64_load(&block->allocated)) >= q->bs) {
@@ -491,6 +484,7 @@ struct bbq_queue_state_s bbq_allocate_entry(struct bbq *q, uint64_t ph, uint32_t
uint64_t cur_vsn = bbq_cur_vsn(q, old);
uint64_t cur_off = bbq_cur_off(q, old);
#ifdef TEST_PERF_1
+ // uint64_t ph_ver = bbq_head_vsn(q, ph);
// if ((cur_off >= q->bs) || bbq_cursor_overflow(ph_idx, ph_ver, cur_vsn)) {
// state.state = BBQ_BLOCK_DONE;
// return state;
@@ -1100,26 +1094,19 @@ uint32_t bbq_dequeue_burst_elem(struct bbq *q, void *obj_table, uint32_t n, uint
bool bbq_malloc_free_equal() {
#ifdef BBQ_MEMORY
- bool ret = true;
- for (int i = 0; i < BBQ_MODULE_MAX; i++) {
- uint64_t malloc_cnt = atomic_load(&bbq_memory_g[i].malloc_cnt);
- uint64_t free_cnt = atomic_load(&bbq_memory_g[i].free_cnt);
- if (malloc_cnt != free_cnt) {
- BBQ_ERR_LOG("[module:%d] malloc:%lu free:%lu, bbq mmalloc-free count not equal\n", i, malloc_cnt, free_cnt);
- ret = false;
- }
+ if (bbq_memory_g.malloc_cnt != bbq_memory_g.free_cnt) {
+ BBQ_ERR_LOG("malloc:%lu free:%lu, bbq mmalloc-free count not equal\n",
+ bbq_memory_g.malloc_cnt, bbq_memory_g.free_cnt);
+ return false;
+ }
- uint64_t malloc_size = atomic_load(&bbq_memory_g[i].malloc_size);
- uint64_t free_size = atomic_load(&bbq_memory_g[i].free_size);
- if (malloc_size != free_size) {
- BBQ_ERR_LOG("[module:%d] malloc:%lu byte free:%lu byte, bbq mmalloc-free size not equal\n", i, malloc_size, free_size);
- ret = false;
- }
+ if (bbq_memory_g.malloc_size != bbq_memory_g.free_size) {
+ BBQ_ERR_LOG("malloc:%lu byte free:%lu byte, bbq mmalloc-free size not equal\n",
+ bbq_memory_g.malloc_size, bbq_memory_g.free_size);
+ return false;
}
- return ret;
-#else
- return true;
#endif
+ return true;
}
bool bbq_debug_check_array_bounds(struct bbq *q) {
@@ -1142,17 +1129,9 @@ bool bbq_debug_check_array_bounds(struct bbq *q) {
void bbq_debug_memory_print() {
#ifdef BBQ_MEMORY
- for (int i = 0; i < BBQ_MODULE_MAX; i++) {
- uint64_t malloc_cnt = atomic_load(&bbq_memory_g[i].malloc_cnt);
- uint64_t free_cnt = atomic_load(&bbq_memory_g[i].free_cnt);
- if (malloc_cnt == 0 && free_cnt == 0) {
- continue;
- }
-
- printf("[%d]bbq malloc:%lu free:%lu\n", i,
- atomic_load(&bbq_memory_g[i].malloc_cnt),
- atomic_load(&bbq_memory_g[i].free_cnt));
- }
+ printf("bbq malloc:%lu free:%lu\n",
+ bbq_memory_g.malloc_cnt,
+ bbq_memory_g.free_cnt);
if (bbq_malloc_free_equal()) {
printf("all memory free\n");