summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorliuyu <[email protected]>2024-06-26 11:37:09 +0800
committerliuyu <[email protected]>2024-06-26 11:37:09 +0800
commitc6735742b1990c65fc67fd4e2873fa1640dc4f7d (patch)
treef876bd7eda6a227de7632d199e876c8ee54d60e1
parent39e368c5ae0f2dc3bd33ab03c990f866468b0a32 (diff)
bugfix:更新bbq库实现,添加error number,修复burst失败时返回非0bug。
-rw-r--r--infra/include/bbq.h177
-rw-r--r--infra/include/vnode.h2
-rw-r--r--infra/src/bbq.c437
-rw-r--r--infra/src/vnode_common.c12
-rw-r--r--infra/src/vnode_common.h2
-rw-r--r--infra/src/vnode_mirror.c16
-rw-r--r--infra/test/TestVNode.cc44
7 files changed, 410 insertions, 280 deletions
diff --git a/infra/include/bbq.h b/infra/include/bbq.h
index 6e4d84e..d8afb3d 100644
--- a/infra/include/bbq.h
+++ b/infra/include/bbq.h
@@ -1,6 +1,6 @@
/*
* @Author: [email protected]
- * @LastEditTime: 2024-06-20 18:03:59
+ * @LastEditTime: 2024-06-25 16:49:38
* @Describe: bbq(Block-based Bounded Queue)头文件
* 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf
*/
@@ -26,6 +26,7 @@ using aotmic_uint64 = std::atomic<uint64_t>;
#define BBQ_SOCKET_ID_ANY -1
#define __BBQ_CACHE_ALIGNED __attribute__((__aligned__(64)))
+#define BBQ_SYMBOL_MAX 64
struct bbq_block
{
@@ -33,43 +34,40 @@ struct bbq_block
bbq_cursor allocated; // 已分配(version|offset)
bbq_cursor reserved; // 已预留(version|offset)
bbq_cursor consumed; // 已消费(version|offset)注:在drop-old模式下没用到
- char * entries; // 存储大小可变的entry,分配空间大小:bs * entry_size
+ char * entries; // 存储大小可变的entry,每个块分配空间:bs * entry_size
} __BBQ_CACHE_ALIGNED;
struct bbq
{
- size_t bn; // blocks的个数
- size_t bs; // blocks.entries的个数
- size_t entry_size; // blocks.entries里每个entry的大小
+ char name[BBQ_SYMBOL_MAX] __BBQ_CACHE_ALIGNED;
- int32_t socket_id; // 用于libnuma分配内存,socket_id小于0将使用malloc分配
- uint32_t flags; // 标记:retry new 模式,还是drop old模式
- uint32_t idx_bits; // bbq_head里idx所占的位数
- uint32_t off_bits; // bbq_cursor里offset所占的位数
-
- uint64_t idx_mask; // idx_bits偏移后的掩码
- uint64_t off_mask; // off_bits偏移后的掩码
- bbq_head phead; // 生产者头,指向块的索引,分为两部分:version|idx
- bbq_head chead; // 消费者头,指向块的索引,分为两部分:version|idx
+ int32_t socket_id; // 用于libnuma分配内存,socket_id小于0将使用malloc分配
+ uint32_t bn; // blocks的个数
+ uint32_t bs; // blocks.entries的个数
+ uint32_t flags; // 标记:retry new 模式,还是drop old模式
+ uint32_t idx_bits; // bbq_head里idx所占的位数
+ uint32_t off_bits; // bbq_cursor里offset所占的位数
+ uint64_t idx_mask; // idx_bits偏移后的掩码
+ uint64_t off_mask; // off_bits偏移后的掩码
+ uint64_t entry_size; // blocks.entries里每个entry的大小
+ bbq_head phead; // 生产者头,指向块的索引,分为两部分:version|idx
+ bbq_head chead; // 消费者头,指向块的索引,分为两部分:version|idx
struct bbq_block * blocks; // bn大小的数组
-};
-
-#define BBQ_F_DROP_OLD 0x0002 /**< 创建队列时设置为drop old模式(队列满时,入队成功并覆盖旧数据) */
-// #define BBQ_F_SP_ENQ 0x0004 /**< 创建队列时设置为单生产者 */
-// #define BBQ_F_SC_DEQ 0x0008 /**< 创建队列时设置为单消费者 */
+} __BBQ_CACHE_ALIGNED;
#define BBQ_F_DEFAULT 0x0
+#define BBQ_F_DROP_OLD 0x0002 /**< 创建队列时设置为drop old模式(队列满时,入队成功并覆盖旧数据) */
#define BBQ_F_RETRY_NEW BBQ_F_DEFAULT /**< 创建队列时设置为retry new模式(队列满时,当前入队失败) */
-#define BBQ_F_MP_ENQ BBQ_F_DEFAULT /**< 创建队列时设置为多生产者 */
-#define BBQ_F_MC_DEQ BBQ_F_DEFAULT /**< 创建队列时设置为多消费者 */
/**
* 创建bbq队列,使用当前函数创建的队列,后续操作会把指针入队。
* 对应入队函数:bbq_enqueue、bbq_enqueue_burst
* 对应出队函数:bbq_dequeue、bbq_dequeue_burst
*
+ * @param[in] name
+ * 队列名称
* @param[in] count
- * 队列所有entry的个数,count必须大于1,且是2的N次方。
+ * 队列大小,参数必须大于1,且是2的N次方。
* @param[in] socket_id
* 多numa架构下,调用libnuma库函数针对指定socket分配内存。
* 当检测到不支持多numa,将转为malloc分配内存。
@@ -79,9 +77,13 @@ struct bbq
* 2)BBQ_F_DROP_OLD:队列满时,覆盖旧数据,入队成功
* @return
* 非NULL:消息队列结构体指针,用于后续出队入队等操作。
- * NULL:创建失败。
+ * NULL:创建失败,可通过bbq_errno分析具体错误原因:
+ * - BBQ_ERR_OUT_OF_RANGE:name或count参数超出范围
+ * - BBQ_ERR_ALLOC:申请内存失败
+ * - BBQ_ERR_POWER_OF_TWO:count不为2的n次方
+ * - BBQ_ERR_INPUT_NULL:name传入空指针
*/
-extern struct bbq * bbq_create(uint32_t count, int socket_id, uint32_t flags);
+extern struct bbq * bbq_create(const char * name, uint32_t count, int socket_id, uint32_t flags);
/**
* 消息队列单个指针入队
@@ -89,10 +91,14 @@ extern struct bbq * bbq_create(uint32_t count, int socket_id, uint32_t flags);
* @param[in] q
* 队列指针
* @param[in] data
- * 则传入一维指针,如:
- * int *data = malloc(sizeof(int));*data = 1; 传入&data
+ * 指向入队指针的指针,如:
+ * int *data = malloc(sizeof(int));*data = TEST_DATA; 传入&data
* @return
- * 成功返回0,失败返回小于0的错误码。
+ * 成功返回0,失败返回小于0的错误码。可能存在以下错误码:
+ * - BBQ_ERR_INPUT_NULL:传入空指针
+ * - BBQ_ERR_FULL:队列已满
+ * - BBQ_ERR_BUSY:队列忙碌中
+ * - BBQ_ERR:其它错误
*/
extern int bbq_enqueue(struct bbq * q, void * const * data);
@@ -102,15 +108,19 @@ extern int bbq_enqueue(struct bbq * q, void * const * data);
* @param[in] q
* 队列指针
* @param[out] data
- * 则传入二维指针,如:
+ * 传入二级指针,如:
* int *data = NULL; 传入&data
* @return
- * 成功返回0,失败返回小于0的错误码。
+ * 成功返回0,失败返回小于0的错误码:
+ * - BBQ_ERR_INPUT_NULL:传入空指针
+ * - BBQ_ERR_EMPTY:队列已空
+ * - BBQ_ERR_BUSY:队列忙碌中
+ * - BBQ_ERR:其它错误
*/
extern int bbq_dequeue(struct bbq * q, void ** data);
/**
- * 消息队列批量指针入队,尽可能一次入队n个指针,返回实际成功入队个数
+ * 消息队列批量入队(指针入队),尽可能一次入队n个指针,返回实际成功入队个数
*
* @param[in] q
* 队列指针
@@ -119,41 +129,52 @@ extern int bbq_dequeue(struct bbq * q, void ** data);
* uint16_t **obj_table = malloc(sizeof(uint16_t **) * BUF_CNT);
* for(int i=0;i<BUF_CNT;i++){
* obj_table[i] = malloc(sizeof(uint16_t));
- * obj_table[i] = 1;
+ * obj_table[i] = TEST_DATA;
* }
* 传入obj_table
* @param[in] n
* 尝试一次入队的个数
- * @param[out] free_space
- * 如果为非NULL,返回执行完当前操作后剩余的entry个数
+ * @param[out] wait_consumed
+ * 如果为非NULL,返回当前队列剩余的个数。注:该赋值可能会带来些许的性能损耗。
* @return
- * 返回实际成功入队的个数
+ * 返回实际成功入队的个数。当入队返回0时,可通过bbq_errno分析具体错误原因:
+ * - BBQ_ERR_INPUT_NULL:传入空指针
+ * - BBQ_ERR_FULL:队列已满
+ * - BBQ_ERR_BUSY:队列忙碌中
+ * - BBQ_ERR:其它错误
*/
-extern uint32_t bbq_enqueue_burst(struct bbq * q, void * const * obj_table, uint32_t n, uint32_t * free_space);
+extern uint32_t bbq_enqueue_burst(struct bbq * q, void * const * obj_table, uint32_t n, uint32_t * wait_consumed);
/**
- * 消息队列批量出队(指针出队),尽可能一次出队n个数据,返回实际成功出队个数
+ * 消息队列批量指针出队,尽可能一次出队n个数据,返回实际成功出队个数
*
* @param[in] q
* 队列指针
* @param[out] obj_table
- * 存储出队的指针,如:
- * uint16_t **obj_table = malloc(sizeof(uint16_t *))
- * 传入obj_table
+ * 用于存储出队的指针,如:
+ * uint16_t **obj_table = malloc(sizeof(uint16_t *)); 传入obj_table
* @param[in] n
* 尝试一次出队的个数
+ * @param[out] wait_consumed
+ * 如果为非NULL,返回当前队列中,已入队的个数。注:该赋值可能会带来些许的性能损耗
* @return
- * 返回实际成功出队的个数
+ * 返回实际成功出队的个数。当出队返回0时,可通过bbq_errno分析具体错误原因:
+ * - BBQ_ERR_INPUT_NULL:传入空指针
+ * - BBQ_ERR_EMPTY:队列已空
+ * - BBQ_ERR_BUSY:队列忙碌中
+ * - BBQ_ERR:其它错误
*/
-extern uint32_t bbq_dequeue_burst(struct bbq * q, void ** obj_table, uint32_t n);
+extern uint32_t bbq_dequeue_burst(struct bbq * q, void ** obj_table, uint32_t n, uint32_t * wait_consumed);
/**
- * 创建bbq队列,使用当前函数创建的队列,后续操作会把指针指向的数据拷贝入队。
+ * 创建bbq队列,使用当前函数创建的队列,在后续操作会把指针指向的数据拷贝入队。
* 对应入队函数:bbq_enqueue_elem、bbq_enqueue_burst_elem
* 对应出队函数:bbq_dequeue_elem、bbq_dequeue_burst_elem
*
+ * @param[in] name
+ * 队列名称
* @param[in] count
- * 队列所有entry的个数,count必须大于1,且是2的N次方。
+ * 队列大小,参数必须大于1,且是2的N次方。
* @param[in] socket_id
* 多numa架构下,调用libnuma库函数针对指定socket分配内存。
* 当检测到不支持多numa,将转为malloc分配内存。
@@ -163,9 +184,13 @@ extern uint32_t bbq_dequeue_burst(struct bbq * q, void ** obj_table, uint32_t n)
* 2)BBQ_F_DROP_OLD:队列满时,覆盖旧数据,入队成功
* @return
* 非NULL:消息队列结构体指针,用于后续出队入队等操作。
- * NULL:创建失败。
+ * NULL:创建失败。可通过bbq_errno分析具体错误原因:
+ * - BBQ_ERR_OUT_OF_RANGE:name或count参数超出范围
+ * - BBQ_ERR_ALLOC:申请内存失败
+ * - BBQ_ERR_POWER_OF_TWO:count不为2的n次方
+ * - BBQ_ERR_INPUT_NULL:name传入空指针
*/
-extern struct bbq * bbq_create_elem(uint32_t count, size_t obj_size, int socket_id, uint32_t flags);
+extern struct bbq * bbq_create_elem(const char * name, uint32_t count, size_t obj_size, int socket_id, uint32_t flags);
/**
* 消息队列单个数据入队(指针指向的数据将被拷贝)
@@ -173,9 +198,13 @@ extern struct bbq * bbq_create_elem(uint32_t count, size_t obj_size, int socket_
* @param[in] q
* 队列指针
* @param[in] data
- * 传入一维指针,如:int data = 1; 传入&data
+ * 传入一级指针,如:int data = 1; 传入&data
* @return
- * 成功返回0,失败返回小于0的错误码。
+ * 成功返回0,失败返回小于0的错误码。可通过bbq_errno分析具体错误原因:
+ * - BBQ_ERR_INPUT_NULL:传入空指针
+ * - BBQ_ERR_FULL:队列已满
+ * - BBQ_ERR_BUSY:队列忙碌中
+ * - BBQ_ERR:其它错误
*/
extern int bbq_enqueue_elem(struct bbq * q, void const * data);
@@ -185,9 +214,13 @@ extern int bbq_enqueue_elem(struct bbq * q, void const * data);
* @param[in] q
* 队列指针
* @param[in] data
- * 则传入一维指针,如:int data; 传入&data
+ * 则传入一级指针,如:int data; 传入&data
* @return
- * 成功返回0,失败返回小于0的错误码。
+ * 成功返回0,失败返回小于0的错误码。可通过bbq_errno分析具体错误原因:
+ * - BBQ_ERR_INPUT_NULL:传入空指针
+ * - BBQ_ERR_EMPTY:队列已空
+ * - BBQ_ERR_BUSY:队列忙碌中
+ * - BBQ_ERR:其它错误
*/
extern int bbq_dequeue_elem(struct bbq * q, void * data);
@@ -197,14 +230,20 @@ extern int bbq_dequeue_elem(struct bbq * q, void * data);
* @param[in] q
* 队列指针
* @param[in] obj_table
- * 即将入队的数组,将数组里的每个成员入队,如:
+ * 将数组里的每个数据入队,如:
* uint16_t obj_table[1024] = {初始化数据}; 传入obj_table
* @param[in] n
* 尝试一次入队的个数
+ * @param[out] wait_consumed
+ * 如果为非NULL,返回当前队列中,已入队的个数。。注:该赋值可能会带来些许的性能损耗
* @return
- * 返回实际成功入队个数
+ * 返回实际成功入队的个数。当入队返回0时,可通过bbq_errno分析具体错误原因:
+ * - BBQ_ERR_INPUT_NULL:传入空指针
+ * - BBQ_ERR_FULL:队列已满
+ * - BBQ_ERR_BUSY:队列忙碌中
+ * - BBQ_ERR:其它错误
*/
-extern uint32_t bbq_enqueue_burst_elem(struct bbq * q, void const * obj_table, uint32_t n);
+extern uint32_t bbq_enqueue_burst_elem(struct bbq * q, void const * obj_table, uint32_t n, uint32_t * wait_consumed);
/**
* 消息队列批量出队(数据出队),尽可能一次出队n个数据,返回实际成功出队个数
@@ -216,10 +255,16 @@ extern uint32_t bbq_enqueue_burst_elem(struct bbq * q, void const * obj_table, u
* uint16_t obj_table[BUF_CNT] = {0}; 传入(void *)obj_table
* @param[in] n
* 尝试一次出队的个数
+ * @param[out] wait_consumed
+ * 如果为非NULL,返回当前队列中,已入队的个数。。注:该赋值可能会带来些许的性能损耗
* @return
- * 返回实际成功出队个数
+ * 成功返回0,失败返回小于0的错误码。可通过bbq_errno分析具体错误原因:
+ * - BBQ_ERR_INPUT_NULL:传入空指针
+ * - BBQ_ERR_EMPTY:队列已空
+ * - BBQ_ERR_BUSY:队列忙碌中
+ * - BBQ_ERR:其它错误
*/
-extern uint32_t bbq_dequeue_burst_elem(struct bbq * q, void * obj_table, uint32_t n);
+extern uint32_t bbq_dequeue_burst_elem(struct bbq * q, void * obj_table, uint32_t n, uint32_t * wait_consumed);
/**
* 用于释放消息队列。
@@ -248,15 +293,17 @@ extern void bbq_destory(struct bbq * q);
*/
extern void bbq_debug_struct_print(struct bbq * q);
-// 通用返回码
-#define BBQ_OK 0 // 成功
-#define BBQ_ERROR -1 // 通用错误
-#define BBQ_ALLOC_ERR -2 // 内存分配失败
-#define BBQ_NULL_PTR -3 // 空指针
-#define BBQ_UNKNOWN_TYPE -3 // 未知类型
+// 错误码
+#define BBQ_OK 0 // 成功
+#define BBQ_ERR -1 // 通用错误
+#define BBQ_ERR_ALLOC -2 // 内存分配失败
+#define BBQ_ERR_INPUT_NULL -3 // 传入空指针
+#define BBQ_ERR_POWER_OF_TWO -4 // 不是2的n次方
+#define BBQ_ERR_OUT_OF_RANGE -5 // 超出范围
+
+#define BBQ_ERR_FULL -101 // 队列已满(入队失败)
+#define BBQ_ERR_BUSY -102 // 队列忙碌中(入队或出队失败)
+#define BBQ_ERR_EMPTY -103 // 队列已空(出队失败)
+#define BBQ_ERR_NOT_SUPPORT -104 // 不支持的操作
-// 队列错误
-#define BBQ_QUEUE_FULL -1001 // 队列已满(入队失败)
-#define BBQ_QUEUE_BUSY -1002 // 队列忙碌中(入队或出队失败)
-#define BBQ_QUEUE_EMPTY -1003 // 队列已空(出队失败)
-#define BBQ_QUEUE_DATA_ERR -1004 // 传入的数据异常 \ No newline at end of file
+extern __thread int32_t bbq_errno; \ No newline at end of file
diff --git a/infra/include/vnode.h b/infra/include/vnode.h
index 6272851..6f3dcfc 100644
--- a/infra/include/vnode.h
+++ b/infra/include/vnode.h
@@ -12,7 +12,7 @@ extern "C"
#include <common.h>
#include <rte_mbuf.h>
-// #define BBQ_SPSC
+#define BBQ_SPSC
#define __DECLARE_COMMON_VNODE_CREATE_PROD(_type) \
struct vnode_prod * vnode_##_type##_create_prod(struct vnode * vnode, const char * symbol, int nr_prodq);
diff --git a/infra/src/bbq.c b/infra/src/bbq.c
index 992b8f7..6d1a954 100644
--- a/infra/src/bbq.c
+++ b/infra/src/bbq.c
@@ -1,6 +1,6 @@
/*
* @Author: liuyu
- * @LastEditTime: 2024-06-20 18:58:05
+ * @LastEditTime: 2024-06-25 16:49:28
* @Describe: bbq(Block-based Bounded Queue)实现
* 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf
@@ -12,8 +12,8 @@
#include <string.h>
// flags第1位控制入队时的数据拷贝策略,默认是"拷贝指针"
-#define BBQ_F_COPY_PTR 0x0 /**< 默认为拷贝指针 */
-#define BBQ_F_COPY_VALUE 0x0001 /**< 创建队列时设置为拷贝数值 */
+#define BBQ_F_COPY_PTR BBQ_F_DEFAULT /**< 默认为拷贝指针 */
+#define BBQ_F_COPY_VALUE 0x0001 /**< 创建队列时设置为拷贝数值 */
// 判断flags标记位
#define BBQ_F_CHK_DROP_OLD(flags) (flags & BBQ_F_DROP_OLD)
@@ -28,6 +28,8 @@
printf("\x1b[31m [ERR][%s:%d:%s]" fmt "\x1b[0m\n", __func__, __LINE__, __FILE__, ##__VA_ARGS__); \
} while (0)
+__thread int32_t bbq_errno;
+
struct bbq_status
{
int32_t status; // 返回状态
@@ -46,34 +48,34 @@ enum bbq_queue_state
struct bbq_entry_desc
{
- uint64_t vsn; // allocated游标的版本(vsn) TODO:修正注释
- uint64_t off; // entry在当前block的偏移(offset)
+ uint64_t vsn; // allocated或reserved的版本(vsn)
+ uint64_t off; // entry在当前块的偏移(offset)
uint32_t actual_burst; // 实际出/入队个数
- struct bbq_block * block; // 指向所在的block
+ struct bbq_block * block; // 指向所在的块
};
struct bbq_queue_state_s
{
- enum bbq_queue_state state; // 队列状态
- union { // TODO:
- uint64_t vsn; // reserve_entry state==BLOCK_DONE时生效
+ enum bbq_queue_state state; // 队列状态
+ union {
+ uint64_t vsn; // bbq_reserve_entry state==BLOCK_DONE时生效
struct bbq_entry_desc e; // state为ALLOCATED、RESERVED生效
};
};
-extern inline uint64_t bbq_idx(struct bbq * q, uint64_t x)
+extern inline uint64_t bbq_head_idx(struct bbq * q, uint64_t x)
{
return x & q->idx_mask;
}
-extern inline uint64_t bbq_off(struct bbq * q, uint64_t x)
+extern inline uint64_t bbq_head_vsn(struct bbq * q, uint64_t x)
{
- return x & q->off_mask;
+ return x >> q->idx_bits;
}
-extern inline uint64_t bbq_head_vsn(struct bbq * q, uint64_t x)
+extern inline uint64_t bbq_cur_off(struct bbq * q, uint64_t x)
{
- return x >> q->idx_bits;
+ return x & q->off_mask;
}
extern inline uint64_t bbq_cur_vsn(struct bbq * q, uint64_t x)
@@ -81,18 +83,17 @@ extern inline uint64_t bbq_cur_vsn(struct bbq * q, uint64_t x)
return x >> q->off_bits;
}
-extern inline uint64_t set_cur_vsn(struct bbq * q, uint64_t ver)
+static inline uint64_t bbq_set_cur_vsn(struct bbq * q, uint64_t ver)
{
return ver << q->off_bits;
}
// 当BBQ_MEMORY宏定义开关打开,将对内存分配释放进行统计,方便排查内存泄漏
-// #define BBQ_MEMORY
enum bbq_module
{
- BBQ_MODULE_QUEUE = 0,
- BBQ_MODULE_QUEUE_BLOCK_NB,
- BBQ_MODULE_QUEUE_BLOCK_ENTRY,
+ BBQ_MODULE_MAIN = 0,
+ BBQ_MODULE_BLOCK_NB,
+ BBQ_MODULE_BLOCK_ENTRY,
BBQ_MODULE_MAX,
};
@@ -154,7 +155,7 @@ static void bbq_free(enum bbq_module module, int socket_id, void * ptr, size_t s
}
/* 原子的比较两个值大小,并设置较大的值,成功则返回设置前的旧值 */
-uint64_t fetch_max(aotmic_uint64 * atom, uint64_t upd)
+uint64_t bbq_fetch_max(aotmic_uint64 * atom, uint64_t upd)
{
uint64_t old_value = 0;
do
@@ -168,7 +169,7 @@ uint64_t fetch_max(aotmic_uint64 * atom, uint64_t upd)
/* 检查参数是否为2的N次幂 */
bool bbq_check_power_of_two(uint32_t n)
{
- if (n <= 0)
+ if (n == 0)
{
return false;
}
@@ -178,7 +179,7 @@ bool bbq_check_power_of_two(uint32_t n)
/* 根据entries大小返回合理的block个数
* 计算公式:log2(blocks) = max(1, ⌊log2(count)/4⌋) 注:⌊ ⌋代表向下取整。*/
-uint32_t bbq_blocks_calc(uint32_t entries)
+uint32_t bbq_block_number_calc(uint32_t entries)
{
double log_entries = log2((double)entries);
uint32_t over4 = (uint32_t)(log_entries / 4); // 向下取整
@@ -192,17 +193,17 @@ int block_init(struct bbq * q, struct bbq_block * block, bool cursor_init)
{
#ifdef BBQ_MEMORY
// 末尾多分配一个entry(永远不应该被修改),以此检查是否存在写越界的问题
- block->entries = bbq_malloc(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id, (q->bs + 1) * q->entry_size);
+ block->entries = bbq_malloc(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, (q->bs + 1) * q->entry_size);
char * last_entry = block->entries + q->entry_size * q->bs;
memset(last_entry, BBQ_MEM_MAGIC, q->entry_size);
#else
- block->entries = bbq_malloc(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id, q->bs * q->entry_size);
+ block->entries = bbq_malloc(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, q->bs * q->entry_size);
#endif
if (block->entries == NULL)
{
- BBQ_ERR_LOG("bbq_malloc error");
- return BBQ_ALLOC_ERR;
+ bbq_errno = BBQ_ERR_ALLOC;
+ return bbq_errno;
}
block->committed = ATOMIC_VAR_INIT(0);
@@ -218,7 +219,7 @@ int block_init(struct bbq * q, struct bbq_block * block, bool cursor_init)
block->reserved = ATOMIC_VAR_INIT(q->bs);
if (BBQ_F_CHK_DROP_OLD(q->flags))
{
- block->consumed = ATOMIC_VAR_INIT(0);
+ block->consumed = ATOMIC_VAR_INIT(0); // drop old模式下用不到consumed
}
else
{
@@ -235,11 +236,9 @@ void block_destory(struct bbq * q, struct bbq_block * block)
if (block->entries)
{
#ifdef BBQ_MEMORY
- bbq_free(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id, block->entries,
- sizeof(*block->entries) * (q->bs + 1) * q->entry_size);
+ bbq_free(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, block->entries, (q->bs + 1) * q->entry_size);
#else
- bbq_free(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id, block->entries,
- sizeof(*block->entries) * q->bs * q->entry_size);
+ bbq_free(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, block->entries, q->bs * q->entry_size);
#endif
block->entries = NULL;
}
@@ -247,45 +246,51 @@ void block_destory(struct bbq * q, struct bbq_block * block)
/*
求x在二进制表示中最高位1所在的位置,x参数不能为0。
-例如:x=1,return 0 (...1)
-x=3,return 1 (..11)
-x=9,return 3 (1..1)
+例如:x=1,return 0 (...1); x=3,return 1 (..11); x=9,return 3 (1..1)
*/
-unsigned floor_log2(uint64_t x)
+static unsigned bbq_floor_log2(uint64_t x)
{
- return x == 1 ? 0 : 1 + floor_log2(x >> 1);
+ return x == 1 ? 0 : 1 + bbq_floor_log2(x >> 1);
}
/*
返回以2为底x的对数,并向上取整值。
-例如:x=1,return 0 (2^0=1)
-x=99, return 7(2^6=64 2^7=128)
+例如:x=1,return 0 (2^0=1); x=99, return 7(2^6=64 2^7=128)
*/
-unsigned ceil_log2(uint64_t x)
+static unsigned bbq_ceil_log2(uint64_t x)
{
- return x == 1 ? 0 : floor_log2(x - 1) + 1;
+ return x == 1 ? 0 : bbq_floor_log2(x - 1) + 1;
}
+// 0-----------------------------------------------------------------------------
/* 创建消息队列,bn和bs必须是2的N次幂,socket_id用于多numa分配内存 */
-static struct bbq * __bbq_create_bnbs(uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags)
+static struct bbq * __bbq_create_bnbs(const char * name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id,
+ uint32_t flags)
{
int ret = 0;
+ bbq_errno = BBQ_OK;
if (bbq_check_power_of_two(bn) == false)
{
- BBQ_ERR_LOG("block number is not power of two, now is :%u", bn);
+ bbq_errno = BBQ_ERR_POWER_OF_TWO;
return NULL;
}
if (bbq_check_power_of_two(bs) == false)
{
- BBQ_ERR_LOG("block size is not power of two, now is :%u", bs);
+ bbq_errno = BBQ_ERR_POWER_OF_TWO;
return NULL;
}
- if (obj_size == 0)
+ if (name == NULL)
{
- BBQ_ERR_LOG("obj_size is 0");
+ bbq_errno = BBQ_ERR_INPUT_NULL;
+ return NULL;
+ }
+
+ if (strlen(name) >= BBQ_SYMBOL_MAX - 1 || obj_size == 0)
+ {
+ bbq_errno = BBQ_ERR_OUT_OF_RANGE;
return NULL;
}
@@ -295,14 +300,14 @@ static struct bbq * __bbq_create_bnbs(uint32_t bn, uint32_t bs, size_t obj_size,
socket_id = BBQ_SOCKET_ID_ANY;
}
- struct bbq * q = bbq_malloc(BBQ_MODULE_QUEUE, socket_id, sizeof(*q));
+ struct bbq * q = bbq_malloc(BBQ_MODULE_MAIN, socket_id, sizeof(*q));
if (q == NULL)
{
- BBQ_ERR_LOG("malloc for bbq queue error");
+ bbq_errno = BBQ_ERR_ALLOC;
return NULL;
}
memset(q, 0, sizeof(*q));
-
+ ret = snprintf(q->name, sizeof(q->name), "%s", name);
q->bn = bn;
q->bs = bs;
q->entry_size = obj_size;
@@ -311,10 +316,10 @@ static struct bbq * __bbq_create_bnbs(uint32_t bn, uint32_t bs, size_t obj_size,
q->chead = ATOMIC_VAR_INIT(0);
q->flags = flags;
- q->blocks = bbq_malloc(BBQ_MODULE_QUEUE_BLOCK_NB, socket_id, bn * sizeof(*q->blocks));
+ q->blocks = bbq_malloc(BBQ_MODULE_BLOCK_NB, socket_id, bn * sizeof(*q->blocks));
if (q->blocks == NULL)
{
- BBQ_ERR_LOG("bbq malloc for blocks error");
+ bbq_errno = BBQ_ERR_ALLOC;
goto error;
}
memset(q->blocks, 0, sizeof(*q->blocks));
@@ -326,13 +331,12 @@ static struct bbq * __bbq_create_bnbs(uint32_t bn, uint32_t bs, size_t obj_size,
ret = block_init(q, &(q->blocks[i]), cursor_init);
if (ret != BBQ_OK)
{
- BBQ_ERR_LOG("bbq block init error");
goto error;
}
}
- q->idx_bits = ceil_log2(bn);
- q->off_bits = ceil_log2(bs) + 1; // 多线程同时add,可能超过bs的问题,因此多分配一位
+ q->idx_bits = bbq_ceil_log2(bn);
+ q->off_bits = bbq_ceil_log2(bs) + 1; // 多线程同时add,可能超过bs的问题,因此多分配一位
q->idx_mask = (1 << q->idx_bits) - 1;
q->off_mask = (1 << q->off_bits) - 1;
@@ -344,44 +348,59 @@ error:
return NULL;
}
-struct bbq * bbq_create_bnbs(uint32_t bn, uint32_t bs, int socket_id, uint32_t flags)
+struct bbq * bbq_create_bnbs(const char * name, uint32_t bn, uint32_t bs, int socket_id, uint32_t flags)
{
- return __bbq_create_bnbs(bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR);
+ return __bbq_create_bnbs(name, bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR);
}
-struct bbq * bbq_create_bnbs_elem(uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags)
+struct bbq * bbq_create_bnbs_elem(const char * name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id,
+ uint32_t flags)
{
- return __bbq_create_bnbs(bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE);
+ return __bbq_create_bnbs(name, bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE);
}
/* 创建消息队列,count必须大于1,且是2的N次幂,bn和bs将根据count值自动计算,socket_id用于多numa分配内存,free_func先设置NULL
*/
-struct bbq * bbq_create_elem(uint32_t count, size_t obj_size, int socket_id, uint32_t flags)
+struct bbq * bbq_create_elem(const char * name, uint32_t count, size_t obj_size, int socket_id, uint32_t flags)
{
- if (bbq_check_power_of_two(count) == false || count == 1)
+ bbq_errno = BBQ_OK;
+ if (count <= 1)
+ {
+ bbq_errno = BBQ_ERR_OUT_OF_RANGE;
+ return NULL;
+ }
+
+ if (bbq_check_power_of_two(count) == false)
{
- BBQ_ERR_LOG("bbq entries number must be power of two and greater than 1, now is :%u", count);
+ bbq_errno = BBQ_ERR_POWER_OF_TWO;
return NULL;
}
- uint32_t bn = bbq_blocks_calc(count);
+ uint32_t bn = bbq_block_number_calc(count);
uint32_t bs = count / bn;
- return bbq_create_bnbs_elem(bn, bs, obj_size, socket_id, flags);
+ return bbq_create_bnbs_elem(name, bn, bs, obj_size, socket_id, flags);
}
-struct bbq * bbq_create(uint32_t count, int socket_id, uint32_t flags)
+struct bbq * bbq_create(const char * name, uint32_t count, int socket_id, uint32_t flags)
{
- if (bbq_check_power_of_two(count) == false || count == 1)
+ bbq_errno = BBQ_OK;
+ if (count <= 1)
+ {
+ bbq_errno = BBQ_ERR_OUT_OF_RANGE;
+ return NULL;
+ }
+
+ if (bbq_check_power_of_two(count) == false)
{
- BBQ_ERR_LOG("bbq entries number must be power of two and greater than 1, now is :%u", count);
+ bbq_errno = BBQ_ERR_POWER_OF_TWO;
return NULL;
}
- uint32_t bn = bbq_blocks_calc(count);
+ uint32_t bn = bbq_block_number_calc(count);
uint32_t bs = count / bn;
- return bbq_create_bnbs(bn, bs, socket_id, flags);
+ return bbq_create_bnbs(name, bn, bs, socket_id, flags);
}
/* 释放消息队列,与bbq_ring_create系列接口成对*/
@@ -397,8 +416,8 @@ void bbq_destory(struct bbq * q)
block_destory(q, &(q->blocks[i]));
}
- bbq_free(BBQ_MODULE_QUEUE_BLOCK_NB, q->socket_id, q->blocks, q->bn * sizeof(*q->blocks));
- bbq_free(BBQ_MODULE_QUEUE, q->socket_id, q, sizeof(*q));
+ bbq_free(BBQ_MODULE_BLOCK_NB, q->socket_id, q->blocks, q->bn * sizeof(*q->blocks));
+ bbq_free(BBQ_MODULE_MAIN, q->socket_id, q, sizeof(*q));
}
#define BBQ_DATA_TYPE_SINGLE 0x0
@@ -454,7 +473,7 @@ void commit_entry(struct bbq * q, struct bbq_entry_desc * e, void const * data,
struct bbq_queue_state_s allocate_entry(struct bbq * q, struct bbq_block * block, uint32_t n)
{
struct bbq_queue_state_s state = {0};
- if (bbq_off(q, atomic_load(&block->allocated)) >= q->bs)
+ if (bbq_cur_off(q, atomic_load(&block->allocated)) >= q->bs)
{
state.state = BBQ_BLOCK_DONE;
return state;
@@ -465,7 +484,7 @@ struct bbq_queue_state_s allocate_entry(struct bbq * q, struct bbq_block * block
// committed_vsn在当前块被初始化后值是不变的,通过比较vsn值,来判断allocated的off是否溢出了,导致vsn+1
uint64_t cur_vsn = bbq_cur_vsn(q, old);
- uint64_t cur_off = bbq_off(q, old);
+ uint64_t cur_off = bbq_cur_off(q, old);
if ((cur_vsn != committed_vsn) || (cur_off >= q->bs))
{
state.state = BBQ_BLOCK_DONE;
@@ -494,14 +513,14 @@ enum bbq_queue_state advance_phead(struct bbq * q, uint64_t ph)
{
// 获取下一个block
uint64_t cur = 0;
- struct bbq_block * n_blk = &(q->blocks[(bbq_idx(q, ph) + 1) & q->idx_mask]);
+ struct bbq_block * n_blk = &(q->blocks[(bbq_head_idx(q, ph) + 1) & q->idx_mask]);
uint64_t ph_vsn = bbq_head_vsn(q, ph);
if (BBQ_F_CHK_DROP_OLD(q->flags))
{
cur = atomic_load(&n_blk->committed);
// 生产者避免前进到上一轮中尚未完全提交的区块
- if (bbq_cur_vsn(q, cur) == ph_vsn && bbq_off(q, cur) != q->bs)
+ if (bbq_cur_vsn(q, cur) == ph_vsn && bbq_cur_off(q, cur) != q->bs)
{
return BBQ_NOT_AVAILABLE;
}
@@ -510,14 +529,14 @@ enum bbq_queue_state advance_phead(struct bbq * q, uint64_t ph)
{
cur = atomic_load(&n_blk->consumed);
uint64_t reserved;
- uint64_t consumed_off = bbq_off(q, cur);
+ uint64_t consumed_off = bbq_cur_off(q, cur);
uint64_t consumed_vsn = bbq_cur_vsn(q, cur);
if (consumed_vsn < ph_vsn || // 生产者赶上了消费者
(consumed_vsn == ph_vsn && consumed_off != q->bs))
{
reserved = atomic_load(&n_blk->reserved);
- if (bbq_off(q, reserved) == consumed_off)
+ if (bbq_cur_off(q, reserved) == consumed_off)
{
return BBQ_NO_ENTRY;
}
@@ -529,54 +548,80 @@ enum bbq_queue_state advance_phead(struct bbq * q, uint64_t ph)
}
// 用head的version初始化下一个块,version在高位,version+1,idex/offset清零,如果没有被其他线程执行过,数值会高于旧值。多线程同时只更新一次。
- uint64_t new_vsn = set_cur_vsn(q, ph_vsn + 1);
- fetch_max(&n_blk->committed, new_vsn);
- fetch_max(&n_blk->allocated, new_vsn);
+ uint64_t new_vsn = bbq_set_cur_vsn(q, ph_vsn + 1);
+ bbq_fetch_max(&n_blk->committed, new_vsn);
+ bbq_fetch_max(&n_blk->allocated, new_vsn);
// 索引+1,当超过索引范围,也就是循环下一轮块时,version+1
- fetch_max(&q->phead, ph + 1);
+ bbq_fetch_max(&q->phead, ph + 1);
return BBQ_SUCCESS;
}
-static uint32_t bbq_free_space_set(struct bbq * q, int32_t ret_status, uint64_t ph, struct bbq_block * blk_ph)
+static uint32_t bbq_wait_consumed_set(struct bbq * q, uint64_t * ch_ptr, uint64_t * ph_ptr, struct bbq_block * blk_ph)
{
- if (ret_status == BBQ_QUEUE_FULL)
+ uint64_t ch;
+ uint64_t ph;
+ if (ch_ptr != NULL)
{
- return 0;
+ ch = *ch_ptr;
+ }
+ else
+ {
+ ch = atomic_load(&q->chead);
}
- uint64_t ch = atomic_load(&q->chead);
- struct bbq_block * blk_ch = &(q->blocks[bbq_idx(q, ch)]);
+ if (ph_ptr != NULL)
+ {
+ ph = *ph_ptr;
+ }
+ else
+ {
+ ph = atomic_load(&q->phead);
+ }
- // uint64_t ph_vsn = bbq_head_vsn(q, ph);
- // uint64_t ch_vsn = bbq_head_vsn(q, ch);
- uint64_t ph_idx = bbq_idx(q, ph);
- uint64_t ch_idx = bbq_idx(q, ch);
- uint64_t committed_off = bbq_off(q, atomic_load(&blk_ph->committed));
- uint64_t reserved_off = bbq_off(q, atomic_load(&blk_ch->reserved));
+ uint64_t ph_idx = bbq_head_idx(q, ph);
+ uint64_t ch_idx = bbq_head_idx(q, ch);
+ uint64_t committed_off = bbq_cur_off(q, atomic_load(&blk_ph->committed));
- if (BBQ_F_CHK_DROP_OLD(q->flags))
+ struct bbq_block * blk_ch = &(q->blocks[bbq_head_idx(q, ch)]);
+ uint64_t reserved_off = bbq_cur_off(q, atomic_load(&blk_ch->reserved));
+
+ // "生产者"超过"消费者"的索引(即块)的个数
+ uint64_t idx_diff = ph_idx >= ch_idx ? ph_idx - ch_idx : q->bn - ch_idx + ph_idx;
+ if (!BBQ_F_CHK_DROP_OLD(q->flags))
{
- // TODO
- return 0;
+ // 这里idx_diff-1=-1也是正确。
+ return (idx_diff - 1) * q->bs + (q->bs - reserved_off + committed_off);
}
- else
+
+ uint64_t ch_vsn = bbq_head_vsn(q, ch);
+ uint64_t ph_vsn = bbq_head_vsn(q, ph);
+
+ if (ph_vsn == ch_vsn || (ph_vsn == (ch_vsn + 1) && (ph_idx < ch_idx)))
{
- // 生产者到消费者的距离
- uint64_t idx_diff = ph_idx >= ch_idx ? q->bn - ph_idx + ch_idx : ch_idx - ph_idx;
- return (idx_diff - 1) * q->bs + (q->bs - committed_off + reserved_off);
+ return (idx_diff - 1) * q->bs + (q->bs - reserved_off + committed_off);
}
+
+ // 发生了覆盖
+ if (ph_idx == ch_idx)
+ {
+ // 当前块以及之前已生产的都作废
+ return 0;
+ }
+
+ return (idx_diff - 1) * q->bs + committed_off;
}
/* 消息队列入队 */
static struct bbq_status __bbq_enqueue(struct bbq * q, void const * data, uint32_t n, uint32_t flag,
- uint32_t * free_space)
+ uint32_t * wait_consumed)
{
struct bbq_status ret = {.status = 0, .actual_burst = 0};
if (q == NULL || data == NULL)
{
- ret.status = BBQ_NULL_PTR;
+ bbq_errno = BBQ_ERR_INPUT_NULL;
+ ret.status = bbq_errno;
return ret;
}
@@ -584,7 +629,7 @@ static struct bbq_status __bbq_enqueue(struct bbq * q, void const * data, uint32
{
// 获取当前phead,转为索引后获取到当前的blk
uint64_t ph = atomic_load(&q->phead);
- struct bbq_block * blk = &(q->blocks[bbq_idx(q, ph)]);
+ struct bbq_block * blk = &(q->blocks[bbq_head_idx(q, ph)]);
struct bbq_queue_state_s state = allocate_entry(q, blk, n);
switch (state.state)
@@ -603,27 +648,31 @@ static struct bbq_status __bbq_enqueue(struct bbq * q, void const * data, uint32
if (pstate == BBQ_NO_ENTRY)
{
- ret.status = BBQ_QUEUE_FULL;
+ bbq_errno = BBQ_ERR_FULL;
+ ret.status = bbq_errno;
}
else if (pstate == BBQ_NOT_AVAILABLE)
{
- ret.status = BBQ_QUEUE_BUSY;
+ bbq_errno = BBQ_ERR_BUSY;
+ ret.status = bbq_errno;
}
else
{
- ret.status = BBQ_ERROR;
+ bbq_errno = BBQ_ERR;
+ ret.status = bbq_errno;
}
break;
}
default:
- ret.status = BBQ_ERROR;
+ bbq_errno = BBQ_ERR;
+ ret.status = bbq_errno;
break;
}
- if (free_space != NULL)
+ if (wait_consumed != NULL)
{
- *free_space = bbq_free_space_set(q, ret.status, ph, blk);
+ *wait_consumed = bbq_wait_consumed_set(q, NULL, &ph, blk);
}
return ret;
@@ -632,24 +681,26 @@ static struct bbq_status __bbq_enqueue(struct bbq * q, void const * data, uint32
int bbq_enqueue(struct bbq * q, void * const * data)
{
+ bbq_errno = BBQ_OK;
struct bbq_status ret = __bbq_enqueue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL);
return ret.status;
}
int bbq_enqueue_elem(struct bbq * q, void const * data)
{
+ bbq_errno = BBQ_OK;
struct bbq_status ret = __bbq_enqueue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL);
return ret.status;
}
/* 更新成功 reserve成功的个数 */
-uint32_t reserve_update(bbq_cursor * aotmic, uint64_t reserved, uint32_t n)
+uint32_t bbq_reserve_update(bbq_cursor * aotmic, uint64_t reserved, uint32_t n)
{
// TODO:逻辑可以合并
if (n == 1)
{
// fetch_max返回的是旧值,如果旧值等于局部变量reserved,则代表数据由当前线程更新
- if (fetch_max(aotmic, reserved + 1) == reserved)
+ if (bbq_fetch_max(aotmic, reserved + 1) == reserved)
{
return 1;
}
@@ -663,13 +714,13 @@ uint32_t reserve_update(bbq_cursor * aotmic, uint64_t reserved, uint32_t n)
}
}
-struct bbq_queue_state_s reserve_entry(struct bbq * q, struct bbq_block * block, uint32_t n)
+struct bbq_queue_state_s bbq_reserve_entry(struct bbq * q, struct bbq_block * block, uint32_t n)
{
while (true)
{
struct bbq_queue_state_s state;
uint64_t reserved = atomic_load(&block->reserved);
- uint64_t reserved_off = bbq_off(q, reserved);
+ uint64_t reserved_off = bbq_cur_off(q, reserved);
uint64_t reserved_svn = bbq_cur_vsn(q, reserved);
if (reserved_off < q->bs)
@@ -686,7 +737,7 @@ struct bbq_queue_state_s reserve_entry(struct bbq * q, struct bbq_block * block,
}
uint64_t committed = atomic_load(&block->committed);
- uint64_t committed_off = bbq_off(q, committed);
+ uint64_t committed_off = bbq_cur_off(q, committed);
if (committed_off == reserved_off)
{
state.state = BBQ_NO_ENTRY;
@@ -697,7 +748,7 @@ struct bbq_queue_state_s reserve_entry(struct bbq * q, struct bbq_block * block,
if (committed_off != q->bs)
{
uint64_t allocated = atomic_load(&block->allocated);
- if (bbq_off(q, allocated) != committed_off)
+ if (bbq_cur_off(q, allocated) != committed_off)
{
state.state = BBQ_NOT_AVAILABLE;
return state;
@@ -705,7 +756,7 @@ struct bbq_queue_state_s reserve_entry(struct bbq * q, struct bbq_block * block,
}
uint32_t tmp = committed_off - reserved_off;
- uint32_t reserved_cnt = reserve_update(&block->reserved, reserved, tmp < n ? tmp : n);
+ uint32_t reserved_cnt = bbq_reserve_update(&block->reserved, reserved, tmp < n ? tmp : n);
if (reserved_cnt > 0)
{ // TODO:多entry时关注
state.state = BBQ_RESERVED;
@@ -791,7 +842,7 @@ bool consume_entry(struct bbq * q, struct bbq_entry_desc * e, void * deq_data, u
bool advance_chead(struct bbq * q, uint64_t ch, uint64_t ver)
{
- uint64_t ch_idx = bbq_idx(q, ch);
+ uint64_t ch_idx = bbq_head_idx(q, ch);
struct bbq_block * n_blk = &(q->blocks[(ch_idx + 1) & q->idx_mask]);
uint64_t ch_vsn = bbq_head_vsn(q, ch);
@@ -803,7 +854,7 @@ bool advance_chead(struct bbq * q, uint64_t ch, uint64_t ver)
// 第一个块是一个特殊情况,因为与其他块相比,它的版本总是相差一个。因此,如果 ch_idx == 0,我们在比较中加 1
if (committed_vsn < ver + (ch_idx == 0))
return false;
- fetch_max(&n_blk->reserved, set_cur_vsn(q, committed_vsn));
+ bbq_fetch_max(&n_blk->reserved, bbq_set_cur_vsn(q, committed_vsn));
}
else
{
@@ -812,82 +863,90 @@ bool advance_chead(struct bbq * q, uint64_t ch, uint64_t ver)
// 消费者追上了生产者,下一块还未开始生产
return false;
}
- uint64_t new_vsn = set_cur_vsn(q, ch_vsn + 1);
- fetch_max(&n_blk->consumed, new_vsn);
- fetch_max(&n_blk->reserved, new_vsn);
+ uint64_t new_vsn = bbq_set_cur_vsn(q, ch_vsn + 1);
+ bbq_fetch_max(&n_blk->consumed, new_vsn);
+ bbq_fetch_max(&n_blk->reserved, new_vsn);
}
- fetch_max(&q->chead, ch + 1);
+ bbq_fetch_max(&q->chead, ch + 1);
return true;
}
/* 消息队列出队 */
-static struct bbq_status __bbq_dequeue(struct bbq * q, void * deq_data, uint32_t n, uint32_t data_type)
+static struct bbq_status __bbq_dequeue(struct bbq * q, void * deq_data, uint32_t n, uint32_t data_type,
+ uint32_t * wait_consumed)
{
struct bbq_status ret = {.status = 0, .actual_burst = 0};
if (q == NULL || deq_data == NULL)
{
- ret.status = BBQ_NULL_PTR;
+ bbq_errno = BBQ_ERR_INPUT_NULL;
+ ret.status = bbq_errno;
return ret;
}
while (true)
{
uint64_t ch = atomic_load(&q->chead);
- struct bbq_block * blk = &(q->blocks[bbq_idx(q, ch)]);
-
+ struct bbq_block * blk = &(q->blocks[bbq_head_idx(q, ch)]);
struct bbq_queue_state_s state;
- state = reserve_entry(q, blk, n);
+ state = bbq_reserve_entry(q, blk, n);
switch (state.state)
{
case BBQ_RESERVED:
- if (consume_entry(q, &state.e, deq_data, data_type))
- {
- ret.status = BBQ_OK;
- ret.actual_burst = state.e.actual_burst;
- return ret;
- }
- else
+ if (!consume_entry(q, &state.e, deq_data, data_type))
{
continue;
}
+ ret.status = BBQ_OK;
+ ret.actual_burst = state.e.actual_burst;
+ break;
case BBQ_NO_ENTRY:
- ret.status = BBQ_QUEUE_EMPTY;
- return ret;
+ bbq_errno = BBQ_ERR_EMPTY;
+ ret.status = bbq_errno;
+ break;
case BBQ_NOT_AVAILABLE:
- ret.status = BBQ_QUEUE_BUSY;
- return ret;
+ bbq_errno = BBQ_ERR_BUSY;
+ ret.status = bbq_errno;
+ break;
case BBQ_BLOCK_DONE:
if (advance_chead(q, ch, state.vsn))
{
continue;
}
- else
- {
- ret.status = BBQ_QUEUE_EMPTY;
- return ret;
- }
+ bbq_errno = BBQ_ERR_EMPTY;
+ ret.status = bbq_errno;
+ break;
default:
- ret.status = BBQ_ERROR;
- return ret;
+ bbq_errno = BBQ_ERR;
+ ret.status = bbq_errno;
+ break;
+ }
+
+ if (wait_consumed != NULL)
+ {
+ *wait_consumed = bbq_wait_consumed_set(q, &ch, NULL, blk);
}
+
+ return ret;
}
}
int bbq_dequeue(struct bbq * q, void ** data)
{
- struct bbq_status ret = __bbq_dequeue(q, data, 1, BBQ_DATA_TYPE_SINGLE);
+ bbq_errno = BBQ_OK;
+ struct bbq_status ret = __bbq_dequeue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL);
return ret.status;
}
int bbq_dequeue_elem(struct bbq * q, void * data)
{
- struct bbq_status ret = __bbq_dequeue(q, data, 1, BBQ_DATA_TYPE_SINGLE);
+ bbq_errno = BBQ_OK;
+ struct bbq_status ret = __bbq_dequeue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL);
return ret.status;
}
-uint32_t bbq_max_burst(struct bbq * q, uint32_t n)
+static uint32_t bbq_max_burst(struct bbq * q, uint32_t n)
{
uint32_t burst = n;
if (burst > q->bs)
@@ -898,16 +957,19 @@ uint32_t bbq_max_burst(struct bbq * q, uint32_t n)
return burst;
}
-static uint32_t bbq_dequeue_burst_one_dimensional(struct bbq * q, void * obj_table, uint32_t n)
+static uint32_t bbq_dequeue_burst_one_dimensional(struct bbq * q, void * obj_table, uint32_t n,
+ uint32_t * wait_consumed)
{
if (q == NULL || obj_table == NULL)
{
- return BBQ_NULL_PTR;
+ bbq_errno = BBQ_ERR_INPUT_NULL;
+ return 0;
}
if (!BBQ_F_CHK_VALUE(q->flags))
{
- return BBQ_QUEUE_DATA_ERR;
+ bbq_errno = BBQ_ERR_NOT_SUPPORT;
+ return 0;
}
uint32_t burst = 0;
@@ -918,7 +980,7 @@ static uint32_t bbq_dequeue_burst_one_dimensional(struct bbq * q, void * obj_tab
while (ready < n)
{
burst = bbq_max_burst(q, n - ready);
- ret = __bbq_dequeue(q, obj, burst, BBQ_DATA_TYPE_ARRAY_1D);
+ ret = __bbq_dequeue(q, obj, burst, BBQ_DATA_TYPE_ARRAY_1D, wait_consumed);
if (ret.status != BBQ_OK)
{
break;
@@ -930,11 +992,13 @@ static uint32_t bbq_dequeue_burst_one_dimensional(struct bbq * q, void * obj_tab
return ready;
}
-static uint32_t bbq_dequeue_burst_two_dimensional(struct bbq * q, void ** obj_table, uint32_t n)
+static uint32_t bbq_dequeue_burst_two_dimensional(struct bbq * q, void ** obj_table, uint32_t n,
+ uint32_t * wait_consumed)
{
if (q == NULL || obj_table == NULL)
{
- return BBQ_NULL_PTR;
+ bbq_errno = BBQ_ERR_INPUT_NULL;
+ return 0;
}
uint32_t burst = 0;
@@ -945,7 +1009,7 @@ static uint32_t bbq_dequeue_burst_two_dimensional(struct bbq * q, void ** obj_ta
while (ready < n)
{
burst = bbq_max_burst(q, n - ready);
- ret = __bbq_dequeue(q, obj_table_tmp, burst, BBQ_DATA_TYPE_ARRAY_2D);
+ ret = __bbq_dequeue(q, obj_table_tmp, burst, BBQ_DATA_TYPE_ARRAY_2D, wait_consumed);
if (ret.status != BBQ_OK)
{
break;
@@ -958,16 +1022,19 @@ static uint32_t bbq_dequeue_burst_two_dimensional(struct bbq * q, void ** obj_ta
}
/* 尝试一次入队多个数据,直到达到最大数量,或是入队失败 */
-uint32_t bbq_enqueue_burst_one_dimensional(struct bbq * q, void const * obj_table, uint32_t n)
+static uint32_t bbq_enqueue_burst_one_dimensional(struct bbq * q, void const * obj_table, uint32_t n,
+ uint32_t * wait_consumed)
{
if (q == NULL || obj_table == NULL)
{
- return BBQ_NULL_PTR;
+ bbq_errno = BBQ_ERR_INPUT_NULL;
+ return 0;
}
if (!BBQ_F_CHK_VALUE(q->flags))
{
- return BBQ_QUEUE_DATA_ERR;
+ bbq_errno = BBQ_ERR_NOT_SUPPORT;
+ return 0;
}
uint32_t burst = 0;
@@ -978,7 +1045,7 @@ uint32_t bbq_enqueue_burst_one_dimensional(struct bbq * q, void const * obj_tabl
while (ready < n)
{
burst = bbq_max_burst(q, n - ready);
- ret = __bbq_enqueue(q, obj, burst, BBQ_DATA_TYPE_ARRAY_1D, NULL);
+ ret = __bbq_enqueue(q, obj, burst, BBQ_DATA_TYPE_ARRAY_1D, wait_consumed);
if (ret.status != BBQ_OK)
{
break;
@@ -991,11 +1058,13 @@ uint32_t bbq_enqueue_burst_one_dimensional(struct bbq * q, void const * obj_tabl
}
/* 尝试一次入队多个数据,直到达到最大数量,或是入队失败 */
-uint32_t bbq_enqueue_burst_two_dimensional(struct bbq * q, void * const * obj_table, uint32_t n, uint32_t * free_space)
+static uint32_t bbq_enqueue_burst_two_dimensional(struct bbq * q, void * const * obj_table, uint32_t n,
+ uint32_t * wait_consumed)
{
if (q == NULL || obj_table == NULL)
{
- return BBQ_NULL_PTR;
+ bbq_errno = BBQ_ERR_INPUT_NULL;
+ return 0;
}
uint32_t burst = 0;
@@ -1006,7 +1075,7 @@ uint32_t bbq_enqueue_burst_two_dimensional(struct bbq * q, void * const * obj_ta
while (ready < n)
{
burst = bbq_max_burst(q, n - ready);
- ret = __bbq_enqueue(q, obj_table_tmp, burst, BBQ_DATA_TYPE_ARRAY_2D, free_space);
+ ret = __bbq_enqueue(q, obj_table_tmp, burst, BBQ_DATA_TYPE_ARRAY_2D, wait_consumed);
if (ret.status != BBQ_OK)
{
break;
@@ -1025,8 +1094,8 @@ bool bbq_empty(struct bbq * q)
uint64_t ph_vsn = bbq_head_vsn(q, phead);
uint64_t ch_vsn = bbq_head_vsn(q, chead);
- uint64_t ph_idx = bbq_idx(q, phead);
- uint64_t ch_idx = bbq_idx(q, chead);
+ uint64_t ph_idx = bbq_head_idx(q, phead);
+ uint64_t ch_idx = bbq_head_idx(q, chead);
struct bbq_block * block;
@@ -1038,14 +1107,14 @@ bool bbq_empty(struct bbq * q)
block = &q->blocks[ph_idx];
if (ph_vsn == ch_vsn)
{
- if (bbq_off(q, atomic_load(&block->reserved)) == bbq_off(q, atomic_load(&block->committed)))
+ if (bbq_cur_off(q, atomic_load(&block->reserved)) == bbq_cur_off(q, atomic_load(&block->committed)))
{
return true;
}
}
bbq_cursor reserved = atomic_load(&block->reserved);
- uint64_t reserved_off = bbq_off(q, reserved);
+ uint64_t reserved_off = bbq_cur_off(q, reserved);
if (BBQ_F_CHK_DROP_OLD(q->flags) && ph_vsn > ch_vsn && reserved_off != q->bs)
{
@@ -1057,29 +1126,35 @@ bool bbq_empty(struct bbq * q)
return false;
}
-uint32_t bbq_enqueue_burst_elem(struct bbq * q, void const * obj_table, uint32_t n)
+uint32_t bbq_enqueue_burst_elem(struct bbq * q, void const * obj_table, uint32_t n, uint32_t * wait_consumed)
{
- return bbq_enqueue_burst_one_dimensional(q, obj_table, n);
+ bbq_errno = BBQ_OK;
+ return bbq_enqueue_burst_one_dimensional(q, obj_table, n, wait_consumed);
}
-uint32_t bbq_enqueue_burst_elem_two_dimensional(struct bbq * q, void * const * obj_table, uint32_t n)
+uint32_t bbq_enqueue_burst_elem_two_dimensional(struct bbq * q, void * const * obj_table, uint32_t n,
+ uint32_t * wait_consumed)
{
- return bbq_enqueue_burst_two_dimensional(q, obj_table, n, NULL);
+ bbq_errno = BBQ_OK;
+ return bbq_enqueue_burst_two_dimensional(q, obj_table, n, wait_consumed);
}
-uint32_t bbq_enqueue_burst(struct bbq * q, void * const * obj_table, uint32_t n, uint32_t * free_space)
+uint32_t bbq_enqueue_burst(struct bbq * q, void * const * obj_table, uint32_t n, uint32_t * wait_consumed)
{
- return bbq_enqueue_burst_two_dimensional(q, obj_table, n, free_space);
+ bbq_errno = BBQ_OK;
+ return bbq_enqueue_burst_two_dimensional(q, obj_table, n, wait_consumed);
}
-uint32_t bbq_dequeue_burst(struct bbq * q, void ** obj_table, uint32_t n)
+uint32_t bbq_dequeue_burst(struct bbq * q, void ** obj_table, uint32_t n, uint32_t * wait_consumed)
{
- return bbq_dequeue_burst_two_dimensional(q, obj_table, n);
+ bbq_errno = BBQ_OK;
+ return bbq_dequeue_burst_two_dimensional(q, obj_table, n, wait_consumed);
}
-uint32_t bbq_dequeue_burst_elem(struct bbq * q, void * obj_table, uint32_t n)
+uint32_t bbq_dequeue_burst_elem(struct bbq * q, void * obj_table, uint32_t n, uint32_t * wait_consumed)
{
- return bbq_dequeue_burst_one_dimensional(q, obj_table, n);
+ bbq_errno = BBQ_OK;
+ return bbq_dequeue_burst_one_dimensional(q, obj_table, n, wait_consumed);
}
bool bbq_malloc_free_equal()
@@ -1165,15 +1240,15 @@ void bbq_debug_block_print(struct bbq * q, struct bbq_block * block)
bbq_cursor committed = atomic_load(&block->committed);
bbq_cursor reserved = atomic_load(&block->reserved);
bbq_cursor consumed = atomic_load(&block->consumed);
- printf(" allocated:%lu committed:%lu reserved:%lu", bbq_off(q, allocated), bbq_off(q, committed),
- bbq_off(q, reserved));
+ printf(" allocated:%lu committed:%lu reserved:%lu", bbq_cur_off(q, allocated), bbq_cur_off(q, committed),
+ bbq_cur_off(q, reserved));
if (BBQ_F_CHK_DROP_OLD(q->flags))
{
printf("\n");
}
else
{
- printf(" consumed:%lu\n", bbq_off(q, consumed));
+ printf(" consumed:%lu\n", bbq_cur_off(q, consumed));
}
}
@@ -1183,18 +1258,18 @@ void bbq_debug_struct_print(struct bbq * q)
uint64_t phead = atomic_load(&q->phead);
uint64_t chead = atomic_load(&q->chead);
- printf("block number:%lu block size:%lu total entries:%lu\n", q->bn, q->bs, q->bn * q->bs);
- printf("producer header idx:%lu vsn:%lu\n", bbq_idx(q, phead), bbq_head_vsn(q, phead));
+ printf("block number:%u block size:%u total entries:%u\n", q->bn, q->bs, q->bn * q->bs);
+ printf("producer header idx:%lu vsn:%lu\n", bbq_head_idx(q, phead), bbq_head_vsn(q, phead));
- uint64_t ph_idx = bbq_idx(q, phead);
- uint64_t ch_idx = bbq_idx(q, chead);
+ uint64_t ph_idx = bbq_head_idx(q, phead);
+ uint64_t ch_idx = bbq_head_idx(q, chead);
if (ph_idx != ch_idx)
{
printf("block[%lu]\n", ph_idx);
bbq_debug_block_print(q, &(q->blocks[ph_idx]));
}
- printf("consumer header idx:%lu vsn:%lu\n", bbq_idx(q, chead), bbq_head_vsn(q, chead));
+ printf("consumer header idx:%lu vsn:%lu\n", bbq_head_idx(q, chead), bbq_head_vsn(q, chead));
printf("block[%lu]\n", ch_idx);
bbq_debug_block_print(q, &(q->blocks[ch_idx]));
} \ No newline at end of file
diff --git a/infra/src/vnode_common.c b/infra/src/vnode_common.c
index 489b708..b1643f9 100644
--- a/infra/src/vnode_common.c
+++ b/infra/src/vnode_common.c
@@ -47,8 +47,8 @@ static struct tunnel_desc * tunnel_new(const char * symbol, unsigned int sz_excl
struct tunnel_desc * desc = ZMALLOC(sizeof(struct tunnel_desc));
MR_VERIFY_MALLOC(desc);
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
- desc->tunnel_object = bbq_create(sz_exclusive + sz_shared, BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW);
+#ifdef BBQ_SPSC
+ desc->tunnel_object = bbq_create(symbol, sz_exclusive + sz_shared, BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW);
#else
desc->tunnel_object =
rte_ring_create(symbol, sz_exclusive + sz_shared, SOCKET_ID_ANY, RING_F_SC_DEQ | RING_F_SP_ENQ);
@@ -81,7 +81,7 @@ static struct tunnel_desc * tunnel_new(const char * symbol, unsigned int sz_excl
errout:
if (desc->tunnel_object != NULL)
{
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
+#ifdef BBQ_SPSC
bbq_destory(desc->tunnel_object);
#else
rte_ring_free(desc->tunnel_object);
@@ -112,7 +112,7 @@ static int tunnel_delete(struct tunnel_desc * desc)
}
struct rte_mbuf * mbuf;
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
+#ifdef BBQ_SPSC
while (bbq_dequeue(desc->tunnel_object, (void **)&mbuf) == 0)
#else
while (rte_ring_dequeue(desc->tunnel_object, (void **)&mbuf) == 0)
@@ -128,7 +128,7 @@ static int tunnel_delete(struct tunnel_desc * desc)
infra_rte_pktmbuf_free(desc->rt_buffer[i]);
}
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
+#ifdef BBQ_SPSC
MR_VERIFY_2(bbq_empty(desc->tunnel_object) == 1, "Tunnel %s is not empty", desc->symbol);
#else
MR_VERIFY_2(rte_ring_empty(desc->tunnel_object) == 1, "Tunnel %s is not empty", desc->symbol);
@@ -136,7 +136,7 @@ static int tunnel_delete(struct tunnel_desc * desc)
rte_free(desc->en_buffer);
rte_free(desc->rt_buffer);
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
+#ifdef BBQ_SPSC
bbq_destory(desc->tunnel_object);
#else
rte_ring_free(desc->tunnel_object);
diff --git a/infra/src/vnode_common.h b/infra/src/vnode_common.h
index ae630a0..0fd1334 100644
--- a/infra/src/vnode_common.h
+++ b/infra/src/vnode_common.h
@@ -26,7 +26,7 @@ struct tunnel_desc
/* Tunnel Name */
char symbol[MR_SYMBOL_MAX];
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
+#ifdef BBQ_SPSC
struct bbq * tunnel_object;
#else
/* Tunel Object, real object to hold pointers */
diff --git a/infra/src/vnode_mirror.c b/infra/src/vnode_mirror.c
index 78afc90..9c0261e 100644
--- a/infra/src/vnode_mirror.c
+++ b/infra/src/vnode_mirror.c
@@ -117,7 +117,11 @@ static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons
rte_cldemote(RTE_PTR_ADD(mbuf, RTE_CACHE_LINE_SIZE * 2));
}
+#ifdef BBQ_SPSC
+ uint32_t wait_consumed = 0;
+#else
unsigned int n_free_space = 0;
+#endif
unsigned int n_send = 0;
bool is_acquire_credit_success = dist_tunnel_acquire_credits(prod->vnode, desc, (int32_t)n_to_send);
@@ -125,8 +129,8 @@ static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons
/* acquire credit, if failed, drop all the packets */
if (likely(is_acquire_credit_success))
{
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
- n_send = bbq_enqueue_burst(desc->tunnel_object, (void **)desc->en_buffer, n_to_send, &n_free_space);
+#ifdef BBQ_SPSC
+ n_send = bbq_enqueue_burst(desc->tunnel_object, (void **)desc->en_buffer, n_to_send, &wait_consumed);
#else
n_send = rte_ring_sp_enqueue_burst(desc->tunnel_object, (void **)desc->en_buffer, n_to_send, &n_free_space);
#endif
@@ -174,7 +178,11 @@ static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons
desc->total_len += n_send_len;
/* q_len */
+#ifdef BBQ_SPSC
+ desc->q_len = wait_consumed;
+#else
desc->q_len = desc->tunnel_size - n_free_space;
+#endif
desc->q_len_avg += 0.2F * ((float)desc->q_len - desc->q_len_avg);
// clear the buffer
@@ -223,8 +231,8 @@ out:
static inline int dist_tunnel_dequeue(struct vnode * vnode_desc, struct tunnel_desc * tunnel_desc, void * obj,
unsigned int nr_max_obj)
{
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
- unsigned int nr_deq = bbq_dequeue_burst(tunnel_desc->tunnel_object, obj, nr_max_obj);
+#ifdef BBQ_SPSC
+ unsigned int nr_deq = bbq_dequeue_burst(tunnel_desc->tunnel_object, obj, nr_max_obj, NULL);
#else
unsigned int nr_deq = rte_ring_sc_dequeue_burst(tunnel_desc->tunnel_object, obj, nr_max_obj, NULL);
#endif
diff --git a/infra/test/TestVNode.cc b/infra/test/TestVNode.cc
index 7101e71..fae95a8 100644
--- a/infra/test/TestVNode.cc
+++ b/infra/test/TestVNode.cc
@@ -242,7 +242,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue)
struct rte_mbuf * rt_objects[TEST_MBUFS_COUNT];
int rt_ret = vnode_mirror_rt_object_retrieve(prod, 0, rt_objects, RTE_DIM(rt_objects));
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
+#ifdef BBQ_SPSC
EXPECT_EQ(rt_ret, 480);
#else
EXPECT_EQ(rt_ret, 481);
@@ -256,7 +256,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue)
struct rte_mbuf * rt_objects[TEST_MBUFS_COUNT];
int rt_ret = vnode_mirror_rt_object_retrieve(prod, 1, rt_objects, RTE_DIM(rt_objects));
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
+#ifdef BBQ_SPSC
EXPECT_EQ(rt_ret, 480);
#else
EXPECT_EQ(rt_ret, 481);
@@ -271,7 +271,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue)
struct rte_mbuf * rt_objects[TEST_MBUFS_COUNT];
int rt_ret = vnode_mirror_rt_object_retrieve(prod, 2, rt_objects, RTE_DIM(rt_objects));
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
+#ifdef BBQ_SPSC
EXPECT_EQ(rt_ret, 480);
#else
EXPECT_EQ(rt_ret, 481);
@@ -286,7 +286,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue)
struct rte_mbuf * rt_objects[TEST_MBUFS_COUNT];
int rt_ret = vnode_mirror_rt_object_retrieve(prod, 3, rt_objects, RTE_DIM(rt_objects));
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
+#ifdef BBQ_SPSC
EXPECT_EQ(rt_ret, 480);
#else
EXPECT_EQ(rt_ret, 481);
@@ -315,7 +315,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue)
EXPECT_EQ(prod_on_line_total, 2048);
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
+#ifdef BBQ_SPSC
EXPECT_EQ(prod_deliver_total, 128);
EXPECT_EQ(prod_missed_total, 1920);
#else
@@ -326,7 +326,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue)
/* on cons side */
struct vnode_cons_stat * cons_stat = vnode_mirror_cons_stat_get(cons);
EXPECT_EQ(cons_stat[0].on_line, 2048);
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
+#ifdef BBQ_SPSC
EXPECT_EQ(cons_stat[0].deliver, 128);
EXPECT_EQ(cons_stat[0].missed, 1920);
#else
@@ -336,7 +336,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue)
struct rte_mbuf * deq_objs[TEST_MBUFS_COUNT];
int deq_ret = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs));
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
+#ifdef BBQ_SPSC
EXPECT_EQ(deq_ret, 128);
#else
EXPECT_EQ(deq_ret, 124);
@@ -379,7 +379,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict)
EXPECT_EQ(enq_ret, 0);
int rt_ret = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, 512);
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
+#ifdef BBQ_SPSC
EXPECT_EQ(rt_ret, 480);
#else
EXPECT_EQ(rt_ret, 481);
@@ -397,7 +397,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict)
/* until here, we have 31 objects enqueue, so only 544 mbufs can be enqueue. */
int deq_ret_1 = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs));
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
+#ifdef BBQ_SPSC
EXPECT_EQ(deq_ret_1, 32);
#else
EXPECT_EQ(deq_ret_1, 31);
@@ -405,7 +405,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict)
struct vnode_prod_stat * prod_stat = vnode_mirror_prod_stat_get(prod);
EXPECT_EQ(prod_stat->on_line, 1024);
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
+#ifdef BBQ_SPSC
EXPECT_EQ(prod_stat->deliver, 32);
EXPECT_EQ(prod_stat->missed, 992);
#else
@@ -415,7 +415,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict)
struct vnode_cons_stat * cons_stat = vnode_mirror_cons_stat_get(cons);
EXPECT_EQ(cons_stat->on_line, 1024);
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
+#ifdef BBQ_SPSC
EXPECT_EQ(cons_stat->deliver, 32);
EXPECT_EQ(cons_stat->missed, 992);
#else
@@ -434,7 +434,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict)
/* get the packet needs to be free */
int rt_ret_3 = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, 512);
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
+#ifdef BBQ_SPSC
EXPECT_EQ(rt_ret_3, 480);
#else
EXPECT_EQ(rt_ret_3, 481);
@@ -449,7 +449,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict)
rte_pktmbuf_free_bulk(deq_objs, rt_ret_4);
int deq_ret_2 = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs));
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
+#ifdef BBQ_SPSC
EXPECT_EQ(deq_ret_2, 32);
#else
EXPECT_EQ(deq_ret_2, 31);
@@ -459,7 +459,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict)
cons_stat = vnode_mirror_cons_stat_get(cons);
EXPECT_EQ(prod_stat->on_line, 2048);
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
+#ifdef BBQ_SPSC
EXPECT_EQ(prod_stat->deliver, 64);
EXPECT_EQ(prod_stat->missed, 1984);
#else
@@ -468,7 +468,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict)
#endif
EXPECT_EQ(cons_stat->on_line, 2048);
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
+#ifdef BBQ_SPSC
EXPECT_EQ(cons_stat->deliver, 64);
EXPECT_EQ(cons_stat->missed, 1984);
#else
@@ -533,7 +533,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleQueueUseSharedCredict)
/* retrieve the drop packets */
struct rte_mbuf * rt_objs[128] = {};
int rt_ret = vnode_mirror_rt_object_retrieve(prod, 0, rt_objs, 128);
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
+#ifdef BBQ_SPSC
EXPECT_EQ(rt_ret, 32);
#else
EXPECT_EQ(rt_ret, 33);
@@ -550,7 +550,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleQueueUseSharedCredict)
/* retrieve the drop packets */
rt_ret = vnode_mirror_rt_object_retrieve(prod, 1, rt_objs, 128);
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
+#ifdef BBQ_SPSC
EXPECT_EQ(rt_ret, 32);
#else
EXPECT_EQ(rt_ret, 33);
@@ -560,7 +560,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleQueueUseSharedCredict)
/* dequeue */
struct rte_mbuf * deq_objs[128] = {};
int deq_ret = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs));
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
+#ifdef BBQ_SPSC
EXPECT_EQ(deq_ret, 64);
#else
EXPECT_EQ(deq_ret, 62);
@@ -607,7 +607,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueMultipleQueue)
EXPECT_EQ(enq_ret, 0);
int rt_ret = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, 512);
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
+#ifdef BBQ_SPSC
EXPECT_EQ(rt_ret, 0);
#else
EXPECT_EQ(rt_ret, 1);
@@ -640,7 +640,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueMultipleQueue)
rte_pktmbuf_free_bulk(deq_objs, 512);
int deq_ret = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs));
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
+#ifdef BBQ_SPSC
EXPECT_EQ(deq_ret, 512);
#else
EXPECT_EQ(deq_ret, 511);
@@ -1055,7 +1055,7 @@ TEST_F(TestCaseVNodeQueue, MultiQueueEnqueue)
EXPECT_EQ(cons_stat[0].deliver, 32);
EXPECT_EQ(cons_stat[0].missed, 0);
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
+#ifdef BBQ_SPSC
// 32/8=4
EXPECT_EQ(cons_stat[0].q_len_max, 4);
EXPECT_LE(cons_stat[0].q_len_avg_max, 4);
@@ -1076,7 +1076,7 @@ TEST_F(TestCaseVNodeQueue, MultiQueueEnqueue)
EXPECT_EQ(cons_stat[0].on_line, 32);
EXPECT_EQ(cons_stat[0].deliver, 32);
EXPECT_EQ(cons_stat[0].missed, 0);
-#if defined(BBQ_SPSC) || defined(BBQ_MPSC)
+#ifdef BBQ_SPSC
EXPECT_EQ(cons_stat[0].q_len_max, 4);
EXPECT_LE(cons_stat[0].q_len_avg_max, 4); // TODO:check???
#else