diff options
| author | liuyu <[email protected]> | 2024-06-26 11:37:09 +0800 |
|---|---|---|
| committer | liuyu <[email protected]> | 2024-07-03 02:53:11 -0400 |
| commit | 659ef978fa00e685c4878ebd6497ac724f97b267 (patch) | |
| tree | aeee4281a2e479e7e8deae9cb1b7143e344fb1c3 | |
| parent | 9b6cecff37bea3237619445d92787d001164b8c7 (diff) | |
bugfix:更新bbq库实现,添加error number,修复burst失败时返回非0bug。
| -rw-r--r-- | infra/include/bbq.h | 177 | ||||
| -rw-r--r-- | infra/include/vnode.h | 2 | ||||
| -rw-r--r-- | infra/src/bbq.c | 437 | ||||
| -rw-r--r-- | infra/src/vnode_common.c | 12 | ||||
| -rw-r--r-- | infra/src/vnode_common.h | 2 | ||||
| -rw-r--r-- | infra/src/vnode_mirror.c | 16 | ||||
| -rw-r--r-- | infra/test/TestVNode.cc | 44 |
7 files changed, 410 insertions, 280 deletions
diff --git a/infra/include/bbq.h b/infra/include/bbq.h index 6e4d84e..d8afb3d 100644 --- a/infra/include/bbq.h +++ b/infra/include/bbq.h @@ -1,6 +1,6 @@ /* * @Author: [email protected] - * @LastEditTime: 2024-06-20 18:03:59 + * @LastEditTime: 2024-06-25 16:49:38 * @Describe: bbq(Block-based Bounded Queue)头文件 * 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf */ @@ -26,6 +26,7 @@ using aotmic_uint64 = std::atomic<uint64_t>; #define BBQ_SOCKET_ID_ANY -1 #define __BBQ_CACHE_ALIGNED __attribute__((__aligned__(64))) +#define BBQ_SYMBOL_MAX 64 struct bbq_block { @@ -33,43 +34,40 @@ struct bbq_block bbq_cursor allocated; // 已分配(version|offset) bbq_cursor reserved; // 已预留(version|offset) bbq_cursor consumed; // 已消费(version|offset)注:在drop-old模式下没用到 - char * entries; // 存储大小可变的entry,分配空间大小:bs * entry_size + char * entries; // 存储大小可变的entry,每个块分配空间:bs * entry_size } __BBQ_CACHE_ALIGNED; struct bbq { - size_t bn; // blocks的个数 - size_t bs; // blocks.entries的个数 - size_t entry_size; // blocks.entries里每个entry的大小 + char name[BBQ_SYMBOL_MAX] __BBQ_CACHE_ALIGNED; - int32_t socket_id; // 用于libnuma分配内存,socket_id小于0将使用malloc分配 - uint32_t flags; // 标记:retry new 模式,还是drop old模式 - uint32_t idx_bits; // bbq_head里idx所占的位数 - uint32_t off_bits; // bbq_cursor里offset所占的位数 - - uint64_t idx_mask; // idx_bits偏移后的掩码 - uint64_t off_mask; // off_bits偏移后的掩码 - bbq_head phead; // 生产者头,指向块的索引,分为两部分:version|idx - bbq_head chead; // 消费者头,指向块的索引,分为两部分:version|idx + int32_t socket_id; // 用于libnuma分配内存,socket_id小于0将使用malloc分配 + uint32_t bn; // blocks的个数 + uint32_t bs; // blocks.entries的个数 + uint32_t flags; // 标记:retry new 模式,还是drop old模式 + uint32_t idx_bits; // bbq_head里idx所占的位数 + uint32_t off_bits; // bbq_cursor里offset所占的位数 + uint64_t idx_mask; // idx_bits偏移后的掩码 + uint64_t off_mask; // off_bits偏移后的掩码 + uint64_t entry_size; // blocks.entries里每个entry的大小 + bbq_head phead; // 生产者头,指向块的索引,分为两部分:version|idx + bbq_head chead; // 消费者头,指向块的索引,分为两部分:version|idx struct bbq_block * blocks; // bn大小的数组 -}; - -#define BBQ_F_DROP_OLD 0x0002 /**< 创建队列时设置为drop old模式(队列满时,入队成功并覆盖旧数据) */ -// #define BBQ_F_SP_ENQ 0x0004 /**< 创建队列时设置为单生产者 */ -// #define BBQ_F_SC_DEQ 0x0008 /**< 创建队列时设置为单消费者 */ +} __BBQ_CACHE_ALIGNED; #define BBQ_F_DEFAULT 0x0 +#define BBQ_F_DROP_OLD 0x0002 /**< 创建队列时设置为drop old模式(队列满时,入队成功并覆盖旧数据) */ #define BBQ_F_RETRY_NEW BBQ_F_DEFAULT /**< 创建队列时设置为retry new模式(队列满时,当前入队失败) */ -#define BBQ_F_MP_ENQ BBQ_F_DEFAULT /**< 创建队列时设置为多生产者 */ -#define BBQ_F_MC_DEQ BBQ_F_DEFAULT /**< 创建队列时设置为多消费者 */ /** * 创建bbq队列,使用当前函数创建的队列,后续操作会把指针入队。 * 对应入队函数:bbq_enqueue、bbq_enqueue_burst * 对应出队函数:bbq_dequeue、bbq_dequeue_burst * + * @param[in] name + * 队列名称 * @param[in] count - * 队列所有entry的个数,count必须大于1,且是2的N次方。 + * 队列大小,参数必须大于1,且是2的N次方。 * @param[in] socket_id * 多numa架构下,调用libnuma库函数针对指定socket分配内存。 * 当检测到不支持多numa,将转为malloc分配内存。 @@ -79,9 +77,13 @@ struct bbq * 2)BBQ_F_DROP_OLD:队列满时,覆盖旧数据,入队成功 * @return * 非NULL:消息队列结构体指针,用于后续出队入队等操作。 - * NULL:创建失败。 + * NULL:创建失败,可通过bbq_errno分析具体错误原因: + * - BBQ_ERR_OUT_OF_RANGE:name或count参数超出范围 + * - BBQ_ERR_ALLOC:申请内存失败 + * - BBQ_ERR_POWER_OF_TWO:count不为2的n次方 + * - BBQ_ERR_INPUT_NULL:name传入空指针 */ -extern struct bbq * bbq_create(uint32_t count, int socket_id, uint32_t flags); +extern struct bbq * bbq_create(const char * name, uint32_t count, int socket_id, uint32_t flags); /** * 消息队列单个指针入队 @@ -89,10 +91,14 @@ extern struct bbq * bbq_create(uint32_t count, int socket_id, uint32_t flags); * @param[in] q * 队列指针 * @param[in] data - * 则传入一维指针,如: - * int *data = malloc(sizeof(int));*data = 1; 传入&data + * 指向入队指针的指针,如: + * int *data = malloc(sizeof(int));*data = TEST_DATA; 传入&data * @return - * 成功返回0,失败返回小于0的错误码。 + * 成功返回0,失败返回小于0的错误码。可能存在以下错误码: + * - BBQ_ERR_INPUT_NULL:传入空指针 + * - BBQ_ERR_FULL:队列已满 + * - BBQ_ERR_BUSY:队列忙碌中 + * - BBQ_ERR:其它错误 */ extern int bbq_enqueue(struct bbq * q, void * const * data); @@ -102,15 +108,19 @@ extern int bbq_enqueue(struct bbq * q, void * const * data); * @param[in] q * 队列指针 * @param[out] data - * 则传入二维指针,如: + * 传入二级指针,如: * int *data = NULL; 传入&data * @return - * 成功返回0,失败返回小于0的错误码。 + * 成功返回0,失败返回小于0的错误码: + * - BBQ_ERR_INPUT_NULL:传入空指针 + * - BBQ_ERR_EMPTY:队列已空 + * - BBQ_ERR_BUSY:队列忙碌中 + * - BBQ_ERR:其它错误 */ extern int bbq_dequeue(struct bbq * q, void ** data); /** - * 消息队列批量指针入队,尽可能一次入队n个指针,返回实际成功入队个数 + * 消息队列批量入队(指针入队),尽可能一次入队n个指针,返回实际成功入队个数 * * @param[in] q * 队列指针 @@ -119,41 +129,52 @@ extern int bbq_dequeue(struct bbq * q, void ** data); * uint16_t **obj_table = malloc(sizeof(uint16_t **) * BUF_CNT); * for(int i=0;i<BUF_CNT;i++){ * obj_table[i] = malloc(sizeof(uint16_t)); - * obj_table[i] = 1; + * obj_table[i] = TEST_DATA; * } * 传入obj_table * @param[in] n * 尝试一次入队的个数 - * @param[out] free_space - * 如果为非NULL,返回执行完当前操作后剩余的entry个数 + * @param[out] wait_consumed + * 如果为非NULL,返回当前队列剩余的个数。注:该赋值可能会带来些许的性能损耗。 * @return - * 返回实际成功入队的个数 + * 返回实际成功入队的个数。当入队返回0时,可通过bbq_errno分析具体错误原因: + * - BBQ_ERR_INPUT_NULL:传入空指针 + * - BBQ_ERR_FULL:队列已满 + * - BBQ_ERR_BUSY:队列忙碌中 + * - BBQ_ERR:其它错误 */ -extern uint32_t bbq_enqueue_burst(struct bbq * q, void * const * obj_table, uint32_t n, uint32_t * free_space); +extern uint32_t bbq_enqueue_burst(struct bbq * q, void * const * obj_table, uint32_t n, uint32_t * wait_consumed); /** - * 消息队列批量出队(指针出队),尽可能一次出队n个数据,返回实际成功出队个数 + * 消息队列批量指针出队,尽可能一次出队n个数据,返回实际成功出队个数 * * @param[in] q * 队列指针 * @param[out] obj_table - * 存储出队的指针,如: - * uint16_t **obj_table = malloc(sizeof(uint16_t *)) - * 传入obj_table + * 用于存储出队的指针,如: + * uint16_t **obj_table = malloc(sizeof(uint16_t *)); 传入obj_table * @param[in] n * 尝试一次出队的个数 + * @param[out] wait_consumed + * 如果为非NULL,返回当前队列中,已入队的个数。注:该赋值可能会带来些许的性能损耗 * @return - * 返回实际成功出队的个数 + * 返回实际成功出队的个数。当出队返回0时,可通过bbq_errno分析具体错误原因: + * - BBQ_ERR_INPUT_NULL:传入空指针 + * - BBQ_ERR_EMPTY:队列已空 + * - BBQ_ERR_BUSY:队列忙碌中 + * - BBQ_ERR:其它错误 */ -extern uint32_t bbq_dequeue_burst(struct bbq * q, void ** obj_table, uint32_t n); +extern uint32_t bbq_dequeue_burst(struct bbq * q, void ** obj_table, uint32_t n, uint32_t * wait_consumed); /** - * 创建bbq队列,使用当前函数创建的队列,后续操作会把指针指向的数据拷贝入队。 + * 创建bbq队列,使用当前函数创建的队列,在后续操作会把指针指向的数据拷贝入队。 * 对应入队函数:bbq_enqueue_elem、bbq_enqueue_burst_elem * 对应出队函数:bbq_dequeue_elem、bbq_dequeue_burst_elem * + * @param[in] name + * 队列名称 * @param[in] count - * 队列所有entry的个数,count必须大于1,且是2的N次方。 + * 队列大小,参数必须大于1,且是2的N次方。 * @param[in] socket_id * 多numa架构下,调用libnuma库函数针对指定socket分配内存。 * 当检测到不支持多numa,将转为malloc分配内存。 @@ -163,9 +184,13 @@ extern uint32_t bbq_dequeue_burst(struct bbq * q, void ** obj_table, uint32_t n) * 2)BBQ_F_DROP_OLD:队列满时,覆盖旧数据,入队成功 * @return * 非NULL:消息队列结构体指针,用于后续出队入队等操作。 - * NULL:创建失败。 + * NULL:创建失败。可通过bbq_errno分析具体错误原因: + * - BBQ_ERR_OUT_OF_RANGE:name或count参数超出范围 + * - BBQ_ERR_ALLOC:申请内存失败 + * - BBQ_ERR_POWER_OF_TWO:count不为2的n次方 + * - BBQ_ERR_INPUT_NULL:name传入空指针 */ -extern struct bbq * bbq_create_elem(uint32_t count, size_t obj_size, int socket_id, uint32_t flags); +extern struct bbq * bbq_create_elem(const char * name, uint32_t count, size_t obj_size, int socket_id, uint32_t flags); /** * 消息队列单个数据入队(指针指向的数据将被拷贝) @@ -173,9 +198,13 @@ extern struct bbq * bbq_create_elem(uint32_t count, size_t obj_size, int socket_ * @param[in] q * 队列指针 * @param[in] data - * 传入一维指针,如:int data = 1; 传入&data + * 传入一级指针,如:int data = 1; 传入&data * @return - * 成功返回0,失败返回小于0的错误码。 + * 成功返回0,失败返回小于0的错误码。可通过bbq_errno分析具体错误原因: + * - BBQ_ERR_INPUT_NULL:传入空指针 + * - BBQ_ERR_FULL:队列已满 + * - BBQ_ERR_BUSY:队列忙碌中 + * - BBQ_ERR:其它错误 */ extern int bbq_enqueue_elem(struct bbq * q, void const * data); @@ -185,9 +214,13 @@ extern int bbq_enqueue_elem(struct bbq * q, void const * data); * @param[in] q * 队列指针 * @param[in] data - * 则传入一维指针,如:int data; 传入&data + * 则传入一级指针,如:int data; 传入&data * @return - * 成功返回0,失败返回小于0的错误码。 + * 成功返回0,失败返回小于0的错误码。可通过bbq_errno分析具体错误原因: + * - BBQ_ERR_INPUT_NULL:传入空指针 + * - BBQ_ERR_EMPTY:队列已空 + * - BBQ_ERR_BUSY:队列忙碌中 + * - BBQ_ERR:其它错误 */ extern int bbq_dequeue_elem(struct bbq * q, void * data); @@ -197,14 +230,20 @@ extern int bbq_dequeue_elem(struct bbq * q, void * data); * @param[in] q * 队列指针 * @param[in] obj_table - * 即将入队的数组,将数组里的每个成员入队,如: + * 将数组里的每个数据入队,如: * uint16_t obj_table[1024] = {初始化数据}; 传入obj_table * @param[in] n * 尝试一次入队的个数 + * @param[out] wait_consumed + * 如果为非NULL,返回当前队列中,已入队的个数。。注:该赋值可能会带来些许的性能损耗 * @return - * 返回实际成功入队个数 + * 返回实际成功入队的个数。当入队返回0时,可通过bbq_errno分析具体错误原因: + * - BBQ_ERR_INPUT_NULL:传入空指针 + * - BBQ_ERR_FULL:队列已满 + * - BBQ_ERR_BUSY:队列忙碌中 + * - BBQ_ERR:其它错误 */ -extern uint32_t bbq_enqueue_burst_elem(struct bbq * q, void const * obj_table, uint32_t n); +extern uint32_t bbq_enqueue_burst_elem(struct bbq * q, void const * obj_table, uint32_t n, uint32_t * wait_consumed); /** * 消息队列批量出队(数据出队),尽可能一次出队n个数据,返回实际成功出队个数 @@ -216,10 +255,16 @@ extern uint32_t bbq_enqueue_burst_elem(struct bbq * q, void const * obj_table, u * uint16_t obj_table[BUF_CNT] = {0}; 传入(void *)obj_table * @param[in] n * 尝试一次出队的个数 + * @param[out] wait_consumed + * 如果为非NULL,返回当前队列中,已入队的个数。。注:该赋值可能会带来些许的性能损耗 * @return - * 返回实际成功出队个数 + * 成功返回0,失败返回小于0的错误码。可通过bbq_errno分析具体错误原因: + * - BBQ_ERR_INPUT_NULL:传入空指针 + * - BBQ_ERR_EMPTY:队列已空 + * - BBQ_ERR_BUSY:队列忙碌中 + * - BBQ_ERR:其它错误 */ -extern uint32_t bbq_dequeue_burst_elem(struct bbq * q, void * obj_table, uint32_t n); +extern uint32_t bbq_dequeue_burst_elem(struct bbq * q, void * obj_table, uint32_t n, uint32_t * wait_consumed); /** * 用于释放消息队列。 @@ -248,15 +293,17 @@ extern void bbq_destory(struct bbq * q); */ extern void bbq_debug_struct_print(struct bbq * q); -// 通用返回码 -#define BBQ_OK 0 // 成功 -#define BBQ_ERROR -1 // 通用错误 -#define BBQ_ALLOC_ERR -2 // 内存分配失败 -#define BBQ_NULL_PTR -3 // 空指针 -#define BBQ_UNKNOWN_TYPE -3 // 未知类型 +// 错误码 +#define BBQ_OK 0 // 成功 +#define BBQ_ERR -1 // 通用错误 +#define BBQ_ERR_ALLOC -2 // 内存分配失败 +#define BBQ_ERR_INPUT_NULL -3 // 传入空指针 +#define BBQ_ERR_POWER_OF_TWO -4 // 不是2的n次方 +#define BBQ_ERR_OUT_OF_RANGE -5 // 超出范围 + +#define BBQ_ERR_FULL -101 // 队列已满(入队失败) +#define BBQ_ERR_BUSY -102 // 队列忙碌中(入队或出队失败) +#define BBQ_ERR_EMPTY -103 // 队列已空(出队失败) +#define BBQ_ERR_NOT_SUPPORT -104 // 不支持的操作 -// 队列错误 -#define BBQ_QUEUE_FULL -1001 // 队列已满(入队失败) -#define BBQ_QUEUE_BUSY -1002 // 队列忙碌中(入队或出队失败) -#define BBQ_QUEUE_EMPTY -1003 // 队列已空(出队失败) -#define BBQ_QUEUE_DATA_ERR -1004 // 传入的数据异常
\ No newline at end of file +extern __thread int32_t bbq_errno;
\ No newline at end of file diff --git a/infra/include/vnode.h b/infra/include/vnode.h index 6272851..6f3dcfc 100644 --- a/infra/include/vnode.h +++ b/infra/include/vnode.h @@ -12,7 +12,7 @@ extern "C" #include <common.h> #include <rte_mbuf.h> -// #define BBQ_SPSC +#define BBQ_SPSC #define __DECLARE_COMMON_VNODE_CREATE_PROD(_type) \ struct vnode_prod * vnode_##_type##_create_prod(struct vnode * vnode, const char * symbol, int nr_prodq); diff --git a/infra/src/bbq.c b/infra/src/bbq.c index 992b8f7..6d1a954 100644 --- a/infra/src/bbq.c +++ b/infra/src/bbq.c @@ -1,6 +1,6 @@ /* * @Author: liuyu - * @LastEditTime: 2024-06-20 18:58:05 + * @LastEditTime: 2024-06-25 16:49:28 * @Email: [email protected] * @Describe: bbq(Block-based Bounded Queue)实现 * 参考:https://www.usenix.org/system/files/atc22-wang-jiawei.pdf @@ -12,8 +12,8 @@ #include <string.h> // flags第1位控制入队时的数据拷贝策略,默认是"拷贝指针" -#define BBQ_F_COPY_PTR 0x0 /**< 默认为拷贝指针 */ -#define BBQ_F_COPY_VALUE 0x0001 /**< 创建队列时设置为拷贝数值 */ +#define BBQ_F_COPY_PTR BBQ_F_DEFAULT /**< 默认为拷贝指针 */ +#define BBQ_F_COPY_VALUE 0x0001 /**< 创建队列时设置为拷贝数值 */ // 判断flags标记位 #define BBQ_F_CHK_DROP_OLD(flags) (flags & BBQ_F_DROP_OLD) @@ -28,6 +28,8 @@ printf("\x1b[31m [ERR][%s:%d:%s]" fmt "\x1b[0m\n", __func__, __LINE__, __FILE__, ##__VA_ARGS__); \ } while (0) +__thread int32_t bbq_errno; + struct bbq_status { int32_t status; // 返回状态 @@ -46,34 +48,34 @@ enum bbq_queue_state struct bbq_entry_desc { - uint64_t vsn; // allocated游标的版本(vsn) TODO:修正注释 - uint64_t off; // entry在当前block的偏移(offset) + uint64_t vsn; // allocated或reserved的版本(vsn) + uint64_t off; // entry在当前块的偏移(offset) uint32_t actual_burst; // 实际出/入队个数 - struct bbq_block * block; // 指向所在的block + struct bbq_block * block; // 指向所在的块 }; struct bbq_queue_state_s { - enum bbq_queue_state state; // 队列状态 - union { // TODO: - uint64_t vsn; // reserve_entry state==BLOCK_DONE时生效 + enum bbq_queue_state state; // 队列状态 + union { + uint64_t vsn; // bbq_reserve_entry state==BLOCK_DONE时生效 struct bbq_entry_desc e; // state为ALLOCATED、RESERVED生效 }; }; -extern inline uint64_t bbq_idx(struct bbq * q, uint64_t x) +extern inline uint64_t bbq_head_idx(struct bbq * q, uint64_t x) { return x & q->idx_mask; } -extern inline uint64_t bbq_off(struct bbq * q, uint64_t x) +extern inline uint64_t bbq_head_vsn(struct bbq * q, uint64_t x) { - return x & q->off_mask; + return x >> q->idx_bits; } -extern inline uint64_t bbq_head_vsn(struct bbq * q, uint64_t x) +extern inline uint64_t bbq_cur_off(struct bbq * q, uint64_t x) { - return x >> q->idx_bits; + return x & q->off_mask; } extern inline uint64_t bbq_cur_vsn(struct bbq * q, uint64_t x) @@ -81,18 +83,17 @@ extern inline uint64_t bbq_cur_vsn(struct bbq * q, uint64_t x) return x >> q->off_bits; } -extern inline uint64_t set_cur_vsn(struct bbq * q, uint64_t ver) +static inline uint64_t bbq_set_cur_vsn(struct bbq * q, uint64_t ver) { return ver << q->off_bits; } // 当BBQ_MEMORY宏定义开关打开,将对内存分配释放进行统计,方便排查内存泄漏 -// #define BBQ_MEMORY enum bbq_module { - BBQ_MODULE_QUEUE = 0, - BBQ_MODULE_QUEUE_BLOCK_NB, - BBQ_MODULE_QUEUE_BLOCK_ENTRY, + BBQ_MODULE_MAIN = 0, + BBQ_MODULE_BLOCK_NB, + BBQ_MODULE_BLOCK_ENTRY, BBQ_MODULE_MAX, }; @@ -154,7 +155,7 @@ static void bbq_free(enum bbq_module module, int socket_id, void * ptr, size_t s } /* 原子的比较两个值大小,并设置较大的值,成功则返回设置前的旧值 */ -uint64_t fetch_max(aotmic_uint64 * atom, uint64_t upd) +uint64_t bbq_fetch_max(aotmic_uint64 * atom, uint64_t upd) { uint64_t old_value = 0; do @@ -168,7 +169,7 @@ uint64_t fetch_max(aotmic_uint64 * atom, uint64_t upd) /* 检查参数是否为2的N次幂 */ bool bbq_check_power_of_two(uint32_t n) { - if (n <= 0) + if (n == 0) { return false; } @@ -178,7 +179,7 @@ bool bbq_check_power_of_two(uint32_t n) /* 根据entries大小返回合理的block个数 * 计算公式:log2(blocks) = max(1, ⌊log2(count)/4⌋) 注:⌊ ⌋代表向下取整。*/ -uint32_t bbq_blocks_calc(uint32_t entries) +uint32_t bbq_block_number_calc(uint32_t entries) { double log_entries = log2((double)entries); uint32_t over4 = (uint32_t)(log_entries / 4); // 向下取整 @@ -192,17 +193,17 @@ int block_init(struct bbq * q, struct bbq_block * block, bool cursor_init) { #ifdef BBQ_MEMORY // 末尾多分配一个entry(永远不应该被修改),以此检查是否存在写越界的问题 - block->entries = bbq_malloc(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id, (q->bs + 1) * q->entry_size); + block->entries = bbq_malloc(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, (q->bs + 1) * q->entry_size); char * last_entry = block->entries + q->entry_size * q->bs; memset(last_entry, BBQ_MEM_MAGIC, q->entry_size); #else - block->entries = bbq_malloc(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id, q->bs * q->entry_size); + block->entries = bbq_malloc(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, q->bs * q->entry_size); #endif if (block->entries == NULL) { - BBQ_ERR_LOG("bbq_malloc error"); - return BBQ_ALLOC_ERR; + bbq_errno = BBQ_ERR_ALLOC; + return bbq_errno; } block->committed = ATOMIC_VAR_INIT(0); @@ -218,7 +219,7 @@ int block_init(struct bbq * q, struct bbq_block * block, bool cursor_init) block->reserved = ATOMIC_VAR_INIT(q->bs); if (BBQ_F_CHK_DROP_OLD(q->flags)) { - block->consumed = ATOMIC_VAR_INIT(0); + block->consumed = ATOMIC_VAR_INIT(0); // drop old模式下用不到consumed } else { @@ -235,11 +236,9 @@ void block_destory(struct bbq * q, struct bbq_block * block) if (block->entries) { #ifdef BBQ_MEMORY - bbq_free(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id, block->entries, - sizeof(*block->entries) * (q->bs + 1) * q->entry_size); + bbq_free(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, block->entries, (q->bs + 1) * q->entry_size); #else - bbq_free(BBQ_MODULE_QUEUE_BLOCK_ENTRY, q->socket_id, block->entries, - sizeof(*block->entries) * q->bs * q->entry_size); + bbq_free(BBQ_MODULE_BLOCK_ENTRY, q->socket_id, block->entries, q->bs * q->entry_size); #endif block->entries = NULL; } @@ -247,45 +246,51 @@ void block_destory(struct bbq * q, struct bbq_block * block) /* 求x在二进制表示中最高位1所在的位置,x参数不能为0。 -例如:x=1,return 0 (...1) -x=3,return 1 (..11) -x=9,return 3 (1..1) +例如:x=1,return 0 (...1); x=3,return 1 (..11); x=9,return 3 (1..1) */ -unsigned floor_log2(uint64_t x) +static unsigned bbq_floor_log2(uint64_t x) { - return x == 1 ? 0 : 1 + floor_log2(x >> 1); + return x == 1 ? 0 : 1 + bbq_floor_log2(x >> 1); } /* 返回以2为底x的对数,并向上取整值。 -例如:x=1,return 0 (2^0=1) -x=99, return 7(2^6=64 2^7=128) +例如:x=1,return 0 (2^0=1); x=99, return 7(2^6=64 2^7=128) */ -unsigned ceil_log2(uint64_t x) +static unsigned bbq_ceil_log2(uint64_t x) { - return x == 1 ? 0 : floor_log2(x - 1) + 1; + return x == 1 ? 0 : bbq_floor_log2(x - 1) + 1; } +// 0----------------------------------------------------------------------------- /* 创建消息队列,bn和bs必须是2的N次幂,socket_id用于多numa分配内存 */ -static struct bbq * __bbq_create_bnbs(uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags) +static struct bbq * __bbq_create_bnbs(const char * name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, + uint32_t flags) { int ret = 0; + bbq_errno = BBQ_OK; if (bbq_check_power_of_two(bn) == false) { - BBQ_ERR_LOG("block number is not power of two, now is :%u", bn); + bbq_errno = BBQ_ERR_POWER_OF_TWO; return NULL; } if (bbq_check_power_of_two(bs) == false) { - BBQ_ERR_LOG("block size is not power of two, now is :%u", bs); + bbq_errno = BBQ_ERR_POWER_OF_TWO; return NULL; } - if (obj_size == 0) + if (name == NULL) { - BBQ_ERR_LOG("obj_size is 0"); + bbq_errno = BBQ_ERR_INPUT_NULL; + return NULL; + } + + if (strlen(name) >= BBQ_SYMBOL_MAX - 1 || obj_size == 0) + { + bbq_errno = BBQ_ERR_OUT_OF_RANGE; return NULL; } @@ -295,14 +300,14 @@ static struct bbq * __bbq_create_bnbs(uint32_t bn, uint32_t bs, size_t obj_size, socket_id = BBQ_SOCKET_ID_ANY; } - struct bbq * q = bbq_malloc(BBQ_MODULE_QUEUE, socket_id, sizeof(*q)); + struct bbq * q = bbq_malloc(BBQ_MODULE_MAIN, socket_id, sizeof(*q)); if (q == NULL) { - BBQ_ERR_LOG("malloc for bbq queue error"); + bbq_errno = BBQ_ERR_ALLOC; return NULL; } memset(q, 0, sizeof(*q)); - + ret = snprintf(q->name, sizeof(q->name), "%s", name); q->bn = bn; q->bs = bs; q->entry_size = obj_size; @@ -311,10 +316,10 @@ static struct bbq * __bbq_create_bnbs(uint32_t bn, uint32_t bs, size_t obj_size, q->chead = ATOMIC_VAR_INIT(0); q->flags = flags; - q->blocks = bbq_malloc(BBQ_MODULE_QUEUE_BLOCK_NB, socket_id, bn * sizeof(*q->blocks)); + q->blocks = bbq_malloc(BBQ_MODULE_BLOCK_NB, socket_id, bn * sizeof(*q->blocks)); if (q->blocks == NULL) { - BBQ_ERR_LOG("bbq malloc for blocks error"); + bbq_errno = BBQ_ERR_ALLOC; goto error; } memset(q->blocks, 0, sizeof(*q->blocks)); @@ -326,13 +331,12 @@ static struct bbq * __bbq_create_bnbs(uint32_t bn, uint32_t bs, size_t obj_size, ret = block_init(q, &(q->blocks[i]), cursor_init); if (ret != BBQ_OK) { - BBQ_ERR_LOG("bbq block init error"); goto error; } } - q->idx_bits = ceil_log2(bn); - q->off_bits = ceil_log2(bs) + 1; // 多线程同时add,可能超过bs的问题,因此多分配一位 + q->idx_bits = bbq_ceil_log2(bn); + q->off_bits = bbq_ceil_log2(bs) + 1; // 多线程同时add,可能超过bs的问题,因此多分配一位 q->idx_mask = (1 << q->idx_bits) - 1; q->off_mask = (1 << q->off_bits) - 1; @@ -344,44 +348,59 @@ error: return NULL; } -struct bbq * bbq_create_bnbs(uint32_t bn, uint32_t bs, int socket_id, uint32_t flags) +struct bbq * bbq_create_bnbs(const char * name, uint32_t bn, uint32_t bs, int socket_id, uint32_t flags) { - return __bbq_create_bnbs(bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR); + return __bbq_create_bnbs(name, bn, bs, sizeof(void *), socket_id, flags | BBQ_F_COPY_PTR); } -struct bbq * bbq_create_bnbs_elem(uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, uint32_t flags) +struct bbq * bbq_create_bnbs_elem(const char * name, uint32_t bn, uint32_t bs, size_t obj_size, int socket_id, + uint32_t flags) { - return __bbq_create_bnbs(bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE); + return __bbq_create_bnbs(name, bn, bs, obj_size, socket_id, flags | BBQ_F_COPY_VALUE); } /* 创建消息队列,count必须大于1,且是2的N次幂,bn和bs将根据count值自动计算,socket_id用于多numa分配内存,free_func先设置NULL */ -struct bbq * bbq_create_elem(uint32_t count, size_t obj_size, int socket_id, uint32_t flags) +struct bbq * bbq_create_elem(const char * name, uint32_t count, size_t obj_size, int socket_id, uint32_t flags) { - if (bbq_check_power_of_two(count) == false || count == 1) + bbq_errno = BBQ_OK; + if (count <= 1) + { + bbq_errno = BBQ_ERR_OUT_OF_RANGE; + return NULL; + } + + if (bbq_check_power_of_two(count) == false) { - BBQ_ERR_LOG("bbq entries number must be power of two and greater than 1, now is :%u", count); + bbq_errno = BBQ_ERR_POWER_OF_TWO; return NULL; } - uint32_t bn = bbq_blocks_calc(count); + uint32_t bn = bbq_block_number_calc(count); uint32_t bs = count / bn; - return bbq_create_bnbs_elem(bn, bs, obj_size, socket_id, flags); + return bbq_create_bnbs_elem(name, bn, bs, obj_size, socket_id, flags); } -struct bbq * bbq_create(uint32_t count, int socket_id, uint32_t flags) +struct bbq * bbq_create(const char * name, uint32_t count, int socket_id, uint32_t flags) { - if (bbq_check_power_of_two(count) == false || count == 1) + bbq_errno = BBQ_OK; + if (count <= 1) + { + bbq_errno = BBQ_ERR_OUT_OF_RANGE; + return NULL; + } + + if (bbq_check_power_of_two(count) == false) { - BBQ_ERR_LOG("bbq entries number must be power of two and greater than 1, now is :%u", count); + bbq_errno = BBQ_ERR_POWER_OF_TWO; return NULL; } - uint32_t bn = bbq_blocks_calc(count); + uint32_t bn = bbq_block_number_calc(count); uint32_t bs = count / bn; - return bbq_create_bnbs(bn, bs, socket_id, flags); + return bbq_create_bnbs(name, bn, bs, socket_id, flags); } /* 释放消息队列,与bbq_ring_create系列接口成对*/ @@ -397,8 +416,8 @@ void bbq_destory(struct bbq * q) block_destory(q, &(q->blocks[i])); } - bbq_free(BBQ_MODULE_QUEUE_BLOCK_NB, q->socket_id, q->blocks, q->bn * sizeof(*q->blocks)); - bbq_free(BBQ_MODULE_QUEUE, q->socket_id, q, sizeof(*q)); + bbq_free(BBQ_MODULE_BLOCK_NB, q->socket_id, q->blocks, q->bn * sizeof(*q->blocks)); + bbq_free(BBQ_MODULE_MAIN, q->socket_id, q, sizeof(*q)); } #define BBQ_DATA_TYPE_SINGLE 0x0 @@ -454,7 +473,7 @@ void commit_entry(struct bbq * q, struct bbq_entry_desc * e, void const * data, struct bbq_queue_state_s allocate_entry(struct bbq * q, struct bbq_block * block, uint32_t n) { struct bbq_queue_state_s state = {0}; - if (bbq_off(q, atomic_load(&block->allocated)) >= q->bs) + if (bbq_cur_off(q, atomic_load(&block->allocated)) >= q->bs) { state.state = BBQ_BLOCK_DONE; return state; @@ -465,7 +484,7 @@ struct bbq_queue_state_s allocate_entry(struct bbq * q, struct bbq_block * block // committed_vsn在当前块被初始化后值是不变的,通过比较vsn值,来判断allocated的off是否溢出了,导致vsn+1 uint64_t cur_vsn = bbq_cur_vsn(q, old); - uint64_t cur_off = bbq_off(q, old); + uint64_t cur_off = bbq_cur_off(q, old); if ((cur_vsn != committed_vsn) || (cur_off >= q->bs)) { state.state = BBQ_BLOCK_DONE; @@ -494,14 +513,14 @@ enum bbq_queue_state advance_phead(struct bbq * q, uint64_t ph) { // 获取下一个block uint64_t cur = 0; - struct bbq_block * n_blk = &(q->blocks[(bbq_idx(q, ph) + 1) & q->idx_mask]); + struct bbq_block * n_blk = &(q->blocks[(bbq_head_idx(q, ph) + 1) & q->idx_mask]); uint64_t ph_vsn = bbq_head_vsn(q, ph); if (BBQ_F_CHK_DROP_OLD(q->flags)) { cur = atomic_load(&n_blk->committed); // 生产者避免前进到上一轮中尚未完全提交的区块 - if (bbq_cur_vsn(q, cur) == ph_vsn && bbq_off(q, cur) != q->bs) + if (bbq_cur_vsn(q, cur) == ph_vsn && bbq_cur_off(q, cur) != q->bs) { return BBQ_NOT_AVAILABLE; } @@ -510,14 +529,14 @@ enum bbq_queue_state advance_phead(struct bbq * q, uint64_t ph) { cur = atomic_load(&n_blk->consumed); uint64_t reserved; - uint64_t consumed_off = bbq_off(q, cur); + uint64_t consumed_off = bbq_cur_off(q, cur); uint64_t consumed_vsn = bbq_cur_vsn(q, cur); if (consumed_vsn < ph_vsn || // 生产者赶上了消费者 (consumed_vsn == ph_vsn && consumed_off != q->bs)) { reserved = atomic_load(&n_blk->reserved); - if (bbq_off(q, reserved) == consumed_off) + if (bbq_cur_off(q, reserved) == consumed_off) { return BBQ_NO_ENTRY; } @@ -529,54 +548,80 @@ enum bbq_queue_state advance_phead(struct bbq * q, uint64_t ph) } // 用head的version初始化下一个块,version在高位,version+1,idex/offset清零,如果没有被其他线程执行过,数值会高于旧值。多线程同时只更新一次。 - uint64_t new_vsn = set_cur_vsn(q, ph_vsn + 1); - fetch_max(&n_blk->committed, new_vsn); - fetch_max(&n_blk->allocated, new_vsn); + uint64_t new_vsn = bbq_set_cur_vsn(q, ph_vsn + 1); + bbq_fetch_max(&n_blk->committed, new_vsn); + bbq_fetch_max(&n_blk->allocated, new_vsn); // 索引+1,当超过索引范围,也就是循环下一轮块时,version+1 - fetch_max(&q->phead, ph + 1); + bbq_fetch_max(&q->phead, ph + 1); return BBQ_SUCCESS; } -static uint32_t bbq_free_space_set(struct bbq * q, int32_t ret_status, uint64_t ph, struct bbq_block * blk_ph) +static uint32_t bbq_wait_consumed_set(struct bbq * q, uint64_t * ch_ptr, uint64_t * ph_ptr, struct bbq_block * blk_ph) { - if (ret_status == BBQ_QUEUE_FULL) + uint64_t ch; + uint64_t ph; + if (ch_ptr != NULL) { - return 0; + ch = *ch_ptr; + } + else + { + ch = atomic_load(&q->chead); } - uint64_t ch = atomic_load(&q->chead); - struct bbq_block * blk_ch = &(q->blocks[bbq_idx(q, ch)]); + if (ph_ptr != NULL) + { + ph = *ph_ptr; + } + else + { + ph = atomic_load(&q->phead); + } - // uint64_t ph_vsn = bbq_head_vsn(q, ph); - // uint64_t ch_vsn = bbq_head_vsn(q, ch); - uint64_t ph_idx = bbq_idx(q, ph); - uint64_t ch_idx = bbq_idx(q, ch); - uint64_t committed_off = bbq_off(q, atomic_load(&blk_ph->committed)); - uint64_t reserved_off = bbq_off(q, atomic_load(&blk_ch->reserved)); + uint64_t ph_idx = bbq_head_idx(q, ph); + uint64_t ch_idx = bbq_head_idx(q, ch); + uint64_t committed_off = bbq_cur_off(q, atomic_load(&blk_ph->committed)); - if (BBQ_F_CHK_DROP_OLD(q->flags)) + struct bbq_block * blk_ch = &(q->blocks[bbq_head_idx(q, ch)]); + uint64_t reserved_off = bbq_cur_off(q, atomic_load(&blk_ch->reserved)); + + // "生产者"超过"消费者"的索引(即块)的个数 + uint64_t idx_diff = ph_idx >= ch_idx ? ph_idx - ch_idx : q->bn - ch_idx + ph_idx; + if (!BBQ_F_CHK_DROP_OLD(q->flags)) { - // TODO - return 0; + // 这里idx_diff-1=-1也是正确。 + return (idx_diff - 1) * q->bs + (q->bs - reserved_off + committed_off); } - else + + uint64_t ch_vsn = bbq_head_vsn(q, ch); + uint64_t ph_vsn = bbq_head_vsn(q, ph); + + if (ph_vsn == ch_vsn || (ph_vsn == (ch_vsn + 1) && (ph_idx < ch_idx))) { - // 生产者到消费者的距离 - uint64_t idx_diff = ph_idx >= ch_idx ? q->bn - ph_idx + ch_idx : ch_idx - ph_idx; - return (idx_diff - 1) * q->bs + (q->bs - committed_off + reserved_off); + return (idx_diff - 1) * q->bs + (q->bs - reserved_off + committed_off); } + + // 发生了覆盖 + if (ph_idx == ch_idx) + { + // 当前块以及之前已生产的都作废 + return 0; + } + + return (idx_diff - 1) * q->bs + committed_off; } /* 消息队列入队 */ static struct bbq_status __bbq_enqueue(struct bbq * q, void const * data, uint32_t n, uint32_t flag, - uint32_t * free_space) + uint32_t * wait_consumed) { struct bbq_status ret = {.status = 0, .actual_burst = 0}; if (q == NULL || data == NULL) { - ret.status = BBQ_NULL_PTR; + bbq_errno = BBQ_ERR_INPUT_NULL; + ret.status = bbq_errno; return ret; } @@ -584,7 +629,7 @@ static struct bbq_status __bbq_enqueue(struct bbq * q, void const * data, uint32 { // 获取当前phead,转为索引后获取到当前的blk uint64_t ph = atomic_load(&q->phead); - struct bbq_block * blk = &(q->blocks[bbq_idx(q, ph)]); + struct bbq_block * blk = &(q->blocks[bbq_head_idx(q, ph)]); struct bbq_queue_state_s state = allocate_entry(q, blk, n); switch (state.state) @@ -603,27 +648,31 @@ static struct bbq_status __bbq_enqueue(struct bbq * q, void const * data, uint32 if (pstate == BBQ_NO_ENTRY) { - ret.status = BBQ_QUEUE_FULL; + bbq_errno = BBQ_ERR_FULL; + ret.status = bbq_errno; } else if (pstate == BBQ_NOT_AVAILABLE) { - ret.status = BBQ_QUEUE_BUSY; + bbq_errno = BBQ_ERR_BUSY; + ret.status = bbq_errno; } else { - ret.status = BBQ_ERROR; + bbq_errno = BBQ_ERR; + ret.status = bbq_errno; } break; } default: - ret.status = BBQ_ERROR; + bbq_errno = BBQ_ERR; + ret.status = bbq_errno; break; } - if (free_space != NULL) + if (wait_consumed != NULL) { - *free_space = bbq_free_space_set(q, ret.status, ph, blk); + *wait_consumed = bbq_wait_consumed_set(q, NULL, &ph, blk); } return ret; @@ -632,24 +681,26 @@ static struct bbq_status __bbq_enqueue(struct bbq * q, void const * data, uint32 int bbq_enqueue(struct bbq * q, void * const * data) { + bbq_errno = BBQ_OK; struct bbq_status ret = __bbq_enqueue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL); return ret.status; } int bbq_enqueue_elem(struct bbq * q, void const * data) { + bbq_errno = BBQ_OK; struct bbq_status ret = __bbq_enqueue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL); return ret.status; } /* 更新成功 reserve成功的个数 */ -uint32_t reserve_update(bbq_cursor * aotmic, uint64_t reserved, uint32_t n) +uint32_t bbq_reserve_update(bbq_cursor * aotmic, uint64_t reserved, uint32_t n) { // TODO:逻辑可以合并 if (n == 1) { // fetch_max返回的是旧值,如果旧值等于局部变量reserved,则代表数据由当前线程更新 - if (fetch_max(aotmic, reserved + 1) == reserved) + if (bbq_fetch_max(aotmic, reserved + 1) == reserved) { return 1; } @@ -663,13 +714,13 @@ uint32_t reserve_update(bbq_cursor * aotmic, uint64_t reserved, uint32_t n) } } -struct bbq_queue_state_s reserve_entry(struct bbq * q, struct bbq_block * block, uint32_t n) +struct bbq_queue_state_s bbq_reserve_entry(struct bbq * q, struct bbq_block * block, uint32_t n) { while (true) { struct bbq_queue_state_s state; uint64_t reserved = atomic_load(&block->reserved); - uint64_t reserved_off = bbq_off(q, reserved); + uint64_t reserved_off = bbq_cur_off(q, reserved); uint64_t reserved_svn = bbq_cur_vsn(q, reserved); if (reserved_off < q->bs) @@ -686,7 +737,7 @@ struct bbq_queue_state_s reserve_entry(struct bbq * q, struct bbq_block * block, } uint64_t committed = atomic_load(&block->committed); - uint64_t committed_off = bbq_off(q, committed); + uint64_t committed_off = bbq_cur_off(q, committed); if (committed_off == reserved_off) { state.state = BBQ_NO_ENTRY; @@ -697,7 +748,7 @@ struct bbq_queue_state_s reserve_entry(struct bbq * q, struct bbq_block * block, if (committed_off != q->bs) { uint64_t allocated = atomic_load(&block->allocated); - if (bbq_off(q, allocated) != committed_off) + if (bbq_cur_off(q, allocated) != committed_off) { state.state = BBQ_NOT_AVAILABLE; return state; @@ -705,7 +756,7 @@ struct bbq_queue_state_s reserve_entry(struct bbq * q, struct bbq_block * block, } uint32_t tmp = committed_off - reserved_off; - uint32_t reserved_cnt = reserve_update(&block->reserved, reserved, tmp < n ? tmp : n); + uint32_t reserved_cnt = bbq_reserve_update(&block->reserved, reserved, tmp < n ? tmp : n); if (reserved_cnt > 0) { // TODO:多entry时关注 state.state = BBQ_RESERVED; @@ -791,7 +842,7 @@ bool consume_entry(struct bbq * q, struct bbq_entry_desc * e, void * deq_data, u bool advance_chead(struct bbq * q, uint64_t ch, uint64_t ver) { - uint64_t ch_idx = bbq_idx(q, ch); + uint64_t ch_idx = bbq_head_idx(q, ch); struct bbq_block * n_blk = &(q->blocks[(ch_idx + 1) & q->idx_mask]); uint64_t ch_vsn = bbq_head_vsn(q, ch); @@ -803,7 +854,7 @@ bool advance_chead(struct bbq * q, uint64_t ch, uint64_t ver) // 第一个块是一个特殊情况,因为与其他块相比,它的版本总是相差一个。因此,如果 ch_idx == 0,我们在比较中加 1 if (committed_vsn < ver + (ch_idx == 0)) return false; - fetch_max(&n_blk->reserved, set_cur_vsn(q, committed_vsn)); + bbq_fetch_max(&n_blk->reserved, bbq_set_cur_vsn(q, committed_vsn)); } else { @@ -812,82 +863,90 @@ bool advance_chead(struct bbq * q, uint64_t ch, uint64_t ver) // 消费者追上了生产者,下一块还未开始生产 return false; } - uint64_t new_vsn = set_cur_vsn(q, ch_vsn + 1); - fetch_max(&n_blk->consumed, new_vsn); - fetch_max(&n_blk->reserved, new_vsn); + uint64_t new_vsn = bbq_set_cur_vsn(q, ch_vsn + 1); + bbq_fetch_max(&n_blk->consumed, new_vsn); + bbq_fetch_max(&n_blk->reserved, new_vsn); } - fetch_max(&q->chead, ch + 1); + bbq_fetch_max(&q->chead, ch + 1); return true; } /* 消息队列出队 */ -static struct bbq_status __bbq_dequeue(struct bbq * q, void * deq_data, uint32_t n, uint32_t data_type) +static struct bbq_status __bbq_dequeue(struct bbq * q, void * deq_data, uint32_t n, uint32_t data_type, + uint32_t * wait_consumed) { struct bbq_status ret = {.status = 0, .actual_burst = 0}; if (q == NULL || deq_data == NULL) { - ret.status = BBQ_NULL_PTR; + bbq_errno = BBQ_ERR_INPUT_NULL; + ret.status = bbq_errno; return ret; } while (true) { uint64_t ch = atomic_load(&q->chead); - struct bbq_block * blk = &(q->blocks[bbq_idx(q, ch)]); - + struct bbq_block * blk = &(q->blocks[bbq_head_idx(q, ch)]); struct bbq_queue_state_s state; - state = reserve_entry(q, blk, n); + state = bbq_reserve_entry(q, blk, n); switch (state.state) { case BBQ_RESERVED: - if (consume_entry(q, &state.e, deq_data, data_type)) - { - ret.status = BBQ_OK; - ret.actual_burst = state.e.actual_burst; - return ret; - } - else + if (!consume_entry(q, &state.e, deq_data, data_type)) { continue; } + ret.status = BBQ_OK; + ret.actual_burst = state.e.actual_burst; + break; case BBQ_NO_ENTRY: - ret.status = BBQ_QUEUE_EMPTY; - return ret; + bbq_errno = BBQ_ERR_EMPTY; + ret.status = bbq_errno; + break; case BBQ_NOT_AVAILABLE: - ret.status = BBQ_QUEUE_BUSY; - return ret; + bbq_errno = BBQ_ERR_BUSY; + ret.status = bbq_errno; + break; case BBQ_BLOCK_DONE: if (advance_chead(q, ch, state.vsn)) { continue; } - else - { - ret.status = BBQ_QUEUE_EMPTY; - return ret; - } + bbq_errno = BBQ_ERR_EMPTY; + ret.status = bbq_errno; + break; default: - ret.status = BBQ_ERROR; - return ret; + bbq_errno = BBQ_ERR; + ret.status = bbq_errno; + break; + } + + if (wait_consumed != NULL) + { + *wait_consumed = bbq_wait_consumed_set(q, &ch, NULL, blk); } + + return ret; } } int bbq_dequeue(struct bbq * q, void ** data) { - struct bbq_status ret = __bbq_dequeue(q, data, 1, BBQ_DATA_TYPE_SINGLE); + bbq_errno = BBQ_OK; + struct bbq_status ret = __bbq_dequeue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL); return ret.status; } int bbq_dequeue_elem(struct bbq * q, void * data) { - struct bbq_status ret = __bbq_dequeue(q, data, 1, BBQ_DATA_TYPE_SINGLE); + bbq_errno = BBQ_OK; + struct bbq_status ret = __bbq_dequeue(q, data, 1, BBQ_DATA_TYPE_SINGLE, NULL); return ret.status; } -uint32_t bbq_max_burst(struct bbq * q, uint32_t n) +static uint32_t bbq_max_burst(struct bbq * q, uint32_t n) { uint32_t burst = n; if (burst > q->bs) @@ -898,16 +957,19 @@ uint32_t bbq_max_burst(struct bbq * q, uint32_t n) return burst; } -static uint32_t bbq_dequeue_burst_one_dimensional(struct bbq * q, void * obj_table, uint32_t n) +static uint32_t bbq_dequeue_burst_one_dimensional(struct bbq * q, void * obj_table, uint32_t n, + uint32_t * wait_consumed) { if (q == NULL || obj_table == NULL) { - return BBQ_NULL_PTR; + bbq_errno = BBQ_ERR_INPUT_NULL; + return 0; } if (!BBQ_F_CHK_VALUE(q->flags)) { - return BBQ_QUEUE_DATA_ERR; + bbq_errno = BBQ_ERR_NOT_SUPPORT; + return 0; } uint32_t burst = 0; @@ -918,7 +980,7 @@ static uint32_t bbq_dequeue_burst_one_dimensional(struct bbq * q, void * obj_tab while (ready < n) { burst = bbq_max_burst(q, n - ready); - ret = __bbq_dequeue(q, obj, burst, BBQ_DATA_TYPE_ARRAY_1D); + ret = __bbq_dequeue(q, obj, burst, BBQ_DATA_TYPE_ARRAY_1D, wait_consumed); if (ret.status != BBQ_OK) { break; @@ -930,11 +992,13 @@ static uint32_t bbq_dequeue_burst_one_dimensional(struct bbq * q, void * obj_tab return ready; } -static uint32_t bbq_dequeue_burst_two_dimensional(struct bbq * q, void ** obj_table, uint32_t n) +static uint32_t bbq_dequeue_burst_two_dimensional(struct bbq * q, void ** obj_table, uint32_t n, + uint32_t * wait_consumed) { if (q == NULL || obj_table == NULL) { - return BBQ_NULL_PTR; + bbq_errno = BBQ_ERR_INPUT_NULL; + return 0; } uint32_t burst = 0; @@ -945,7 +1009,7 @@ static uint32_t bbq_dequeue_burst_two_dimensional(struct bbq * q, void ** obj_ta while (ready < n) { burst = bbq_max_burst(q, n - ready); - ret = __bbq_dequeue(q, obj_table_tmp, burst, BBQ_DATA_TYPE_ARRAY_2D); + ret = __bbq_dequeue(q, obj_table_tmp, burst, BBQ_DATA_TYPE_ARRAY_2D, wait_consumed); if (ret.status != BBQ_OK) { break; @@ -958,16 +1022,19 @@ static uint32_t bbq_dequeue_burst_two_dimensional(struct bbq * q, void ** obj_ta } /* 尝试一次入队多个数据,直到达到最大数量,或是入队失败 */ -uint32_t bbq_enqueue_burst_one_dimensional(struct bbq * q, void const * obj_table, uint32_t n) +static uint32_t bbq_enqueue_burst_one_dimensional(struct bbq * q, void const * obj_table, uint32_t n, + uint32_t * wait_consumed) { if (q == NULL || obj_table == NULL) { - return BBQ_NULL_PTR; + bbq_errno = BBQ_ERR_INPUT_NULL; + return 0; } if (!BBQ_F_CHK_VALUE(q->flags)) { - return BBQ_QUEUE_DATA_ERR; + bbq_errno = BBQ_ERR_NOT_SUPPORT; + return 0; } uint32_t burst = 0; @@ -978,7 +1045,7 @@ uint32_t bbq_enqueue_burst_one_dimensional(struct bbq * q, void const * obj_tabl while (ready < n) { burst = bbq_max_burst(q, n - ready); - ret = __bbq_enqueue(q, obj, burst, BBQ_DATA_TYPE_ARRAY_1D, NULL); + ret = __bbq_enqueue(q, obj, burst, BBQ_DATA_TYPE_ARRAY_1D, wait_consumed); if (ret.status != BBQ_OK) { break; @@ -991,11 +1058,13 @@ uint32_t bbq_enqueue_burst_one_dimensional(struct bbq * q, void const * obj_tabl } /* 尝试一次入队多个数据,直到达到最大数量,或是入队失败 */ -uint32_t bbq_enqueue_burst_two_dimensional(struct bbq * q, void * const * obj_table, uint32_t n, uint32_t * free_space) +static uint32_t bbq_enqueue_burst_two_dimensional(struct bbq * q, void * const * obj_table, uint32_t n, + uint32_t * wait_consumed) { if (q == NULL || obj_table == NULL) { - return BBQ_NULL_PTR; + bbq_errno = BBQ_ERR_INPUT_NULL; + return 0; } uint32_t burst = 0; @@ -1006,7 +1075,7 @@ uint32_t bbq_enqueue_burst_two_dimensional(struct bbq * q, void * const * obj_ta while (ready < n) { burst = bbq_max_burst(q, n - ready); - ret = __bbq_enqueue(q, obj_table_tmp, burst, BBQ_DATA_TYPE_ARRAY_2D, free_space); + ret = __bbq_enqueue(q, obj_table_tmp, burst, BBQ_DATA_TYPE_ARRAY_2D, wait_consumed); if (ret.status != BBQ_OK) { break; @@ -1025,8 +1094,8 @@ bool bbq_empty(struct bbq * q) uint64_t ph_vsn = bbq_head_vsn(q, phead); uint64_t ch_vsn = bbq_head_vsn(q, chead); - uint64_t ph_idx = bbq_idx(q, phead); - uint64_t ch_idx = bbq_idx(q, chead); + uint64_t ph_idx = bbq_head_idx(q, phead); + uint64_t ch_idx = bbq_head_idx(q, chead); struct bbq_block * block; @@ -1038,14 +1107,14 @@ bool bbq_empty(struct bbq * q) block = &q->blocks[ph_idx]; if (ph_vsn == ch_vsn) { - if (bbq_off(q, atomic_load(&block->reserved)) == bbq_off(q, atomic_load(&block->committed))) + if (bbq_cur_off(q, atomic_load(&block->reserved)) == bbq_cur_off(q, atomic_load(&block->committed))) { return true; } } bbq_cursor reserved = atomic_load(&block->reserved); - uint64_t reserved_off = bbq_off(q, reserved); + uint64_t reserved_off = bbq_cur_off(q, reserved); if (BBQ_F_CHK_DROP_OLD(q->flags) && ph_vsn > ch_vsn && reserved_off != q->bs) { @@ -1057,29 +1126,35 @@ bool bbq_empty(struct bbq * q) return false; } -uint32_t bbq_enqueue_burst_elem(struct bbq * q, void const * obj_table, uint32_t n) +uint32_t bbq_enqueue_burst_elem(struct bbq * q, void const * obj_table, uint32_t n, uint32_t * wait_consumed) { - return bbq_enqueue_burst_one_dimensional(q, obj_table, n); + bbq_errno = BBQ_OK; + return bbq_enqueue_burst_one_dimensional(q, obj_table, n, wait_consumed); } -uint32_t bbq_enqueue_burst_elem_two_dimensional(struct bbq * q, void * const * obj_table, uint32_t n) +uint32_t bbq_enqueue_burst_elem_two_dimensional(struct bbq * q, void * const * obj_table, uint32_t n, + uint32_t * wait_consumed) { - return bbq_enqueue_burst_two_dimensional(q, obj_table, n, NULL); + bbq_errno = BBQ_OK; + return bbq_enqueue_burst_two_dimensional(q, obj_table, n, wait_consumed); } -uint32_t bbq_enqueue_burst(struct bbq * q, void * const * obj_table, uint32_t n, uint32_t * free_space) +uint32_t bbq_enqueue_burst(struct bbq * q, void * const * obj_table, uint32_t n, uint32_t * wait_consumed) { - return bbq_enqueue_burst_two_dimensional(q, obj_table, n, free_space); + bbq_errno = BBQ_OK; + return bbq_enqueue_burst_two_dimensional(q, obj_table, n, wait_consumed); } -uint32_t bbq_dequeue_burst(struct bbq * q, void ** obj_table, uint32_t n) +uint32_t bbq_dequeue_burst(struct bbq * q, void ** obj_table, uint32_t n, uint32_t * wait_consumed) { - return bbq_dequeue_burst_two_dimensional(q, obj_table, n); + bbq_errno = BBQ_OK; + return bbq_dequeue_burst_two_dimensional(q, obj_table, n, wait_consumed); } -uint32_t bbq_dequeue_burst_elem(struct bbq * q, void * obj_table, uint32_t n) +uint32_t bbq_dequeue_burst_elem(struct bbq * q, void * obj_table, uint32_t n, uint32_t * wait_consumed) { - return bbq_dequeue_burst_one_dimensional(q, obj_table, n); + bbq_errno = BBQ_OK; + return bbq_dequeue_burst_one_dimensional(q, obj_table, n, wait_consumed); } bool bbq_malloc_free_equal() @@ -1165,15 +1240,15 @@ void bbq_debug_block_print(struct bbq * q, struct bbq_block * block) bbq_cursor committed = atomic_load(&block->committed); bbq_cursor reserved = atomic_load(&block->reserved); bbq_cursor consumed = atomic_load(&block->consumed); - printf(" allocated:%lu committed:%lu reserved:%lu", bbq_off(q, allocated), bbq_off(q, committed), - bbq_off(q, reserved)); + printf(" allocated:%lu committed:%lu reserved:%lu", bbq_cur_off(q, allocated), bbq_cur_off(q, committed), + bbq_cur_off(q, reserved)); if (BBQ_F_CHK_DROP_OLD(q->flags)) { printf("\n"); } else { - printf(" consumed:%lu\n", bbq_off(q, consumed)); + printf(" consumed:%lu\n", bbq_cur_off(q, consumed)); } } @@ -1183,18 +1258,18 @@ void bbq_debug_struct_print(struct bbq * q) uint64_t phead = atomic_load(&q->phead); uint64_t chead = atomic_load(&q->chead); - printf("block number:%lu block size:%lu total entries:%lu\n", q->bn, q->bs, q->bn * q->bs); - printf("producer header idx:%lu vsn:%lu\n", bbq_idx(q, phead), bbq_head_vsn(q, phead)); + printf("block number:%u block size:%u total entries:%u\n", q->bn, q->bs, q->bn * q->bs); + printf("producer header idx:%lu vsn:%lu\n", bbq_head_idx(q, phead), bbq_head_vsn(q, phead)); - uint64_t ph_idx = bbq_idx(q, phead); - uint64_t ch_idx = bbq_idx(q, chead); + uint64_t ph_idx = bbq_head_idx(q, phead); + uint64_t ch_idx = bbq_head_idx(q, chead); if (ph_idx != ch_idx) { printf("block[%lu]\n", ph_idx); bbq_debug_block_print(q, &(q->blocks[ph_idx])); } - printf("consumer header idx:%lu vsn:%lu\n", bbq_idx(q, chead), bbq_head_vsn(q, chead)); + printf("consumer header idx:%lu vsn:%lu\n", bbq_head_idx(q, chead), bbq_head_vsn(q, chead)); printf("block[%lu]\n", ch_idx); bbq_debug_block_print(q, &(q->blocks[ch_idx])); }
\ No newline at end of file diff --git a/infra/src/vnode_common.c b/infra/src/vnode_common.c index 489b708..b1643f9 100644 --- a/infra/src/vnode_common.c +++ b/infra/src/vnode_common.c @@ -47,8 +47,8 @@ static struct tunnel_desc * tunnel_new(const char * symbol, unsigned int sz_excl struct tunnel_desc * desc = ZMALLOC(sizeof(struct tunnel_desc)); MR_VERIFY_MALLOC(desc); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) - desc->tunnel_object = bbq_create(sz_exclusive + sz_shared, BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); +#ifdef BBQ_SPSC + desc->tunnel_object = bbq_create(symbol, sz_exclusive + sz_shared, BBQ_SOCKET_ID_ANY, BBQ_F_RETRY_NEW); #else desc->tunnel_object = rte_ring_create(symbol, sz_exclusive + sz_shared, SOCKET_ID_ANY, RING_F_SC_DEQ | RING_F_SP_ENQ); @@ -81,7 +81,7 @@ static struct tunnel_desc * tunnel_new(const char * symbol, unsigned int sz_excl errout: if (desc->tunnel_object != NULL) { -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC bbq_destory(desc->tunnel_object); #else rte_ring_free(desc->tunnel_object); @@ -112,7 +112,7 @@ static int tunnel_delete(struct tunnel_desc * desc) } struct rte_mbuf * mbuf; -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC while (bbq_dequeue(desc->tunnel_object, (void **)&mbuf) == 0) #else while (rte_ring_dequeue(desc->tunnel_object, (void **)&mbuf) == 0) @@ -128,7 +128,7 @@ static int tunnel_delete(struct tunnel_desc * desc) infra_rte_pktmbuf_free(desc->rt_buffer[i]); } -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC MR_VERIFY_2(bbq_empty(desc->tunnel_object) == 1, "Tunnel %s is not empty", desc->symbol); #else MR_VERIFY_2(rte_ring_empty(desc->tunnel_object) == 1, "Tunnel %s is not empty", desc->symbol); @@ -136,7 +136,7 @@ static int tunnel_delete(struct tunnel_desc * desc) rte_free(desc->en_buffer); rte_free(desc->rt_buffer); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC bbq_destory(desc->tunnel_object); #else rte_ring_free(desc->tunnel_object); diff --git a/infra/src/vnode_common.h b/infra/src/vnode_common.h index ae630a0..0fd1334 100644 --- a/infra/src/vnode_common.h +++ b/infra/src/vnode_common.h @@ -26,7 +26,7 @@ struct tunnel_desc /* Tunnel Name */ char symbol[MR_SYMBOL_MAX]; -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC struct bbq * tunnel_object; #else /* Tunel Object, real object to hold pointers */ diff --git a/infra/src/vnode_mirror.c b/infra/src/vnode_mirror.c index 78afc90..9c0261e 100644 --- a/infra/src/vnode_mirror.c +++ b/infra/src/vnode_mirror.c @@ -117,7 +117,11 @@ static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons rte_cldemote(RTE_PTR_ADD(mbuf, RTE_CACHE_LINE_SIZE * 2)); } +#ifdef BBQ_SPSC + uint32_t wait_consumed = 0; +#else unsigned int n_free_space = 0; +#endif unsigned int n_send = 0; bool is_acquire_credit_success = dist_tunnel_acquire_credits(prod->vnode, desc, (int32_t)n_to_send); @@ -125,8 +129,8 @@ static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons /* acquire credit, if failed, drop all the packets */ if (likely(is_acquire_credit_success)) { -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) - n_send = bbq_enqueue_burst(desc->tunnel_object, (void **)desc->en_buffer, n_to_send, &n_free_space); +#ifdef BBQ_SPSC + n_send = bbq_enqueue_burst(desc->tunnel_object, (void **)desc->en_buffer, n_to_send, &wait_consumed); #else n_send = rte_ring_sp_enqueue_burst(desc->tunnel_object, (void **)desc->en_buffer, n_to_send, &n_free_space); #endif @@ -174,7 +178,11 @@ static inline void dist_tunnel_flush(struct vnode_prod * prod, struct vnode_cons desc->total_len += n_send_len; /* q_len */ +#ifdef BBQ_SPSC + desc->q_len = wait_consumed; +#else desc->q_len = desc->tunnel_size - n_free_space; +#endif desc->q_len_avg += 0.2F * ((float)desc->q_len - desc->q_len_avg); // clear the buffer @@ -223,8 +231,8 @@ out: static inline int dist_tunnel_dequeue(struct vnode * vnode_desc, struct tunnel_desc * tunnel_desc, void * obj, unsigned int nr_max_obj) { -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) - unsigned int nr_deq = bbq_dequeue_burst(tunnel_desc->tunnel_object, obj, nr_max_obj); +#ifdef BBQ_SPSC + unsigned int nr_deq = bbq_dequeue_burst(tunnel_desc->tunnel_object, obj, nr_max_obj, NULL); #else unsigned int nr_deq = rte_ring_sc_dequeue_burst(tunnel_desc->tunnel_object, obj, nr_max_obj, NULL); #endif diff --git a/infra/test/TestVNode.cc b/infra/test/TestVNode.cc index 7101e71..fae95a8 100644 --- a/infra/test/TestVNode.cc +++ b/infra/test/TestVNode.cc @@ -242,7 +242,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue) struct rte_mbuf * rt_objects[TEST_MBUFS_COUNT]; int rt_ret = vnode_mirror_rt_object_retrieve(prod, 0, rt_objects, RTE_DIM(rt_objects)); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(rt_ret, 480); #else EXPECT_EQ(rt_ret, 481); @@ -256,7 +256,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue) struct rte_mbuf * rt_objects[TEST_MBUFS_COUNT]; int rt_ret = vnode_mirror_rt_object_retrieve(prod, 1, rt_objects, RTE_DIM(rt_objects)); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(rt_ret, 480); #else EXPECT_EQ(rt_ret, 481); @@ -271,7 +271,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue) struct rte_mbuf * rt_objects[TEST_MBUFS_COUNT]; int rt_ret = vnode_mirror_rt_object_retrieve(prod, 2, rt_objects, RTE_DIM(rt_objects)); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(rt_ret, 480); #else EXPECT_EQ(rt_ret, 481); @@ -286,7 +286,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue) struct rte_mbuf * rt_objects[TEST_MBUFS_COUNT]; int rt_ret = vnode_mirror_rt_object_retrieve(prod, 3, rt_objects, RTE_DIM(rt_objects)); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(rt_ret, 480); #else EXPECT_EQ(rt_ret, 481); @@ -315,7 +315,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue) EXPECT_EQ(prod_on_line_total, 2048); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(prod_deliver_total, 128); EXPECT_EQ(prod_missed_total, 1920); #else @@ -326,7 +326,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue) /* on cons side */ struct vnode_cons_stat * cons_stat = vnode_mirror_cons_stat_get(cons); EXPECT_EQ(cons_stat[0].on_line, 2048); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(cons_stat[0].deliver, 128); EXPECT_EQ(cons_stat[0].missed, 1920); #else @@ -336,7 +336,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleThreadEnqueue) struct rte_mbuf * deq_objs[TEST_MBUFS_COUNT]; int deq_ret = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs)); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(deq_ret, 128); #else EXPECT_EQ(deq_ret, 124); @@ -379,7 +379,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict) EXPECT_EQ(enq_ret, 0); int rt_ret = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, 512); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(rt_ret, 480); #else EXPECT_EQ(rt_ret, 481); @@ -397,7 +397,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict) /* until here, we have 31 objects enqueue, so only 544 mbufs can be enqueue. */ int deq_ret_1 = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs)); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(deq_ret_1, 32); #else EXPECT_EQ(deq_ret_1, 31); @@ -405,7 +405,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict) struct vnode_prod_stat * prod_stat = vnode_mirror_prod_stat_get(prod); EXPECT_EQ(prod_stat->on_line, 1024); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(prod_stat->deliver, 32); EXPECT_EQ(prod_stat->missed, 992); #else @@ -415,7 +415,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict) struct vnode_cons_stat * cons_stat = vnode_mirror_cons_stat_get(cons); EXPECT_EQ(cons_stat->on_line, 1024); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(cons_stat->deliver, 32); EXPECT_EQ(cons_stat->missed, 992); #else @@ -434,7 +434,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict) /* get the packet needs to be free */ int rt_ret_3 = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, 512); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(rt_ret_3, 480); #else EXPECT_EQ(rt_ret_3, 481); @@ -449,7 +449,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict) rte_pktmbuf_free_bulk(deq_objs, rt_ret_4); int deq_ret_2 = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs)); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(deq_ret_2, 32); #else EXPECT_EQ(deq_ret_2, 31); @@ -459,7 +459,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict) cons_stat = vnode_mirror_cons_stat_get(cons); EXPECT_EQ(prod_stat->on_line, 2048); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(prod_stat->deliver, 64); EXPECT_EQ(prod_stat->missed, 1984); #else @@ -468,7 +468,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueAndDequeueUseSharedCredict) #endif EXPECT_EQ(cons_stat->on_line, 2048); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(cons_stat->deliver, 64); EXPECT_EQ(cons_stat->missed, 1984); #else @@ -533,7 +533,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleQueueUseSharedCredict) /* retrieve the drop packets */ struct rte_mbuf * rt_objs[128] = {}; int rt_ret = vnode_mirror_rt_object_retrieve(prod, 0, rt_objs, 128); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(rt_ret, 32); #else EXPECT_EQ(rt_ret, 33); @@ -550,7 +550,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleQueueUseSharedCredict) /* retrieve the drop packets */ rt_ret = vnode_mirror_rt_object_retrieve(prod, 1, rt_objs, 128); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(rt_ret, 32); #else EXPECT_EQ(rt_ret, 33); @@ -560,7 +560,7 @@ TEST_F(TestCaseVNode, TestVNodeMultipleQueueUseSharedCredict) /* dequeue */ struct rte_mbuf * deq_objs[128] = {}; int deq_ret = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs)); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(deq_ret, 64); #else EXPECT_EQ(deq_ret, 62); @@ -607,7 +607,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueMultipleQueue) EXPECT_EQ(enq_ret, 0); int rt_ret = vnode_mirror_rt_object_retrieve(prod, 0, deq_objs, 512); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(rt_ret, 0); #else EXPECT_EQ(rt_ret, 1); @@ -640,7 +640,7 @@ TEST_F(TestCaseVNode, TestVNodeEnqueueMultipleQueue) rte_pktmbuf_free_bulk(deq_objs, 512); int deq_ret = vnode_mirror_dequeue_burst(cons, 0, deq_objs, RTE_DIM(deq_objs)); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(deq_ret, 512); #else EXPECT_EQ(deq_ret, 511); @@ -1055,7 +1055,7 @@ TEST_F(TestCaseVNodeQueue, MultiQueueEnqueue) EXPECT_EQ(cons_stat[0].deliver, 32); EXPECT_EQ(cons_stat[0].missed, 0); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC // 32/8=4 EXPECT_EQ(cons_stat[0].q_len_max, 4); EXPECT_LE(cons_stat[0].q_len_avg_max, 4); @@ -1076,7 +1076,7 @@ TEST_F(TestCaseVNodeQueue, MultiQueueEnqueue) EXPECT_EQ(cons_stat[0].on_line, 32); EXPECT_EQ(cons_stat[0].deliver, 32); EXPECT_EQ(cons_stat[0].missed, 0); -#if defined(BBQ_SPSC) || defined(BBQ_MPSC) +#ifdef BBQ_SPSC EXPECT_EQ(cons_stat[0].q_len_max, 4); EXPECT_LE(cons_stat[0].q_len_avg_max, 4); // TODO:check??? #else |
