summaryrefslogtreecommitdiff
path: root/bbq/unittest/ut_bbq_func.c
diff options
context:
space:
mode:
Diffstat (limited to 'bbq/unittest/ut_bbq_func.c')
-rw-r--r--bbq/unittest/ut_bbq_func.c615
1 files changed, 615 insertions, 0 deletions
diff --git a/bbq/unittest/ut_bbq_func.c b/bbq/unittest/ut_bbq_func.c
new file mode 100644
index 0000000..88910ca
--- /dev/null
+++ b/bbq/unittest/ut_bbq_func.c
@@ -0,0 +1,615 @@
+/*
+ * @Author: liuyu
+ * @LastEditTime: 2024-07-07 22:34:03
+ * @Email: [email protected]
+ * @Describe: TODO
+ */
+
+#include "ut_bbq_func.h"
+#include "bbq.h"
+#include <pthread.h>
+#include <string.h>
+#include <sys/prctl.h>
+#include <sys/time.h>
+#include <unistd.h>
+
+struct ut_memory {
+ aotmic_uint64 malloc_cnt;
+ aotmic_uint64 free_cnt;
+};
+
+struct ut_memory ut_memory_g[UT_MODULE_MAX] = {0};
+
+char *ut_ring_type_map[UT_RING_TYPE_MAX] = {
+ [UT_RING_TYPE_BBQ] = UT_RING_TYPE_BBQ_STR,
+ [UT_RING_TYPE_DPDK] = UT_RING_TYPE_DPDK_STR,
+ [UT_RING_TYPE_RMIND] = UT_RING_TYPE_RMIND_STR,
+};
+
+void *ut_malloc(enum ut_module module, size_t size) {
+ void *ptr = malloc(size);
+ if (ptr != NULL) {
+ atomic_fetch_add(&ut_memory_g[module].malloc_cnt, 1);
+ }
+
+ return ptr;
+}
+
+void ut_free(enum ut_module module, void *ptr) {
+ if (ptr != NULL) {
+ atomic_fetch_add(&ut_memory_g[module].free_cnt, 1);
+ }
+ free(ptr);
+}
+
+bool ut_malloc_free_equal() {
+ bool ret = true;
+ for (int i = 0; i < UT_MODULE_MAX; i++) {
+ uint64_t malloc_cnt = atomic_load(&ut_memory_g[i].malloc_cnt);
+ uint64_t free_cnt = atomic_load(&ut_memory_g[i].free_cnt);
+ if (malloc_cnt != free_cnt) {
+ UT_ERR_LOG("[module:%d] malloc:%lu free:%lu, test malloc-free not equal\n", i, malloc_cnt, free_cnt);
+ ret = false;
+ }
+ }
+
+ return ret;
+}
+
+void ut_memory_counter_clear() {
+ memset(ut_memory_g, 0, sizeof(ut_memory_g));
+}
+
+void ut_memory_counter_print() {
+ for (int i = 0; i < UT_MODULE_MAX; i++) {
+ uint64_t malloc_cnt = atomic_load(&ut_memory_g[i].malloc_cnt);
+ uint64_t free_cnt = atomic_load(&ut_memory_g[i].free_cnt);
+ if (malloc_cnt == 0 && free_cnt == 0) {
+ continue;
+ }
+
+ UT_INFO_LOG("[%d]test malloc:%lu free:%lu", i,
+ atomic_load(&ut_memory_g[i].malloc_cnt),
+ atomic_load(&ut_memory_g[i].free_cnt));
+ }
+
+ if (ut_malloc_free_equal()) {
+ UT_INFO_LOG("all memory free");
+ } else {
+ UT_ERR_LOG("memory not all free");
+ }
+}
+
+struct ut_metric ut_clock_time_get() {
+ struct ut_metric metric = {0};
+ clock_gettime(CLOCK_REALTIME, &metric.timestamp); // 系统实时时间,随系统实时时间改变而改变
+ return metric;
+}
+
+uint64_t ut_clock_time_to_ns(struct ut_metric *metric) {
+ return metric->timestamp.tv_nsec + metric->timestamp.tv_sec * 1000 * 1000 * 1000;
+}
+
+double ut_clock_time_to_double(struct ut_metric *metric) {
+ return metric->timestamp.tv_sec +
+ metric->timestamp.tv_nsec * 1.0 / 1000 / 1000 / 1000;
+}
+
+bool ut_clock_time_is_zero(struct ut_metric *metric) {
+ return metric->timestamp.tv_sec == 0 && metric->timestamp.tv_nsec == 0;
+}
+
+bool ut_timespec_is_after(const struct timespec *a, const struct timespec *b) {
+ if (a->tv_sec > b->tv_sec) {
+ // a的秒数大于b的秒数,所以a在b之后
+ return true;
+ } else if (a->tv_sec == b->tv_sec && a->tv_nsec > b->tv_nsec) {
+ // a和b的秒数相同,但a的纳秒数大于b的纳秒数,所以a在b之后
+ return true;
+ }
+ // 否则,a不在b之后
+ return false;
+}
+
+struct ut_metric ut_clock_time_sub(struct ut_metric now, struct ut_metric last) {
+ struct ut_metric diff = {
+ .timestamp.tv_sec = now.timestamp.tv_sec - last.timestamp.tv_sec,
+ .timestamp.tv_nsec = now.timestamp.tv_nsec - last.timestamp.tv_nsec,
+ };
+
+ if (now.timestamp.tv_nsec > last.timestamp.tv_nsec) {
+ diff.timestamp.tv_nsec = now.timestamp.tv_nsec - last.timestamp.tv_nsec;
+ } else {
+ // 从秒借位
+ diff.timestamp.tv_sec--;
+ diff.timestamp.tv_nsec = 1000 * 1000 * 1000 + now.timestamp.tv_nsec - last.timestamp.tv_nsec;
+ }
+
+ return diff;
+}
+
+enum ut_workload ut_workload_str2enum(const char *workload) {
+ if (strcmp(workload, "simple") == 0) {
+ return UT_WORKLOAD_SIMPLE;
+ } else if (strcmp(workload, "complex") == 0) {
+ return UT_WORKLOAD_COMPLEX;
+ }
+
+ return UT_WORKLOAD_MAX;
+}
+
+enum ut_ring_type ut_ring_type_str2enum(const char *ring_type) {
+ if (strcmp(ring_type, UT_RING_TYPE_BBQ_STR) == 0) {
+ return UT_RING_TYPE_BBQ;
+ } else if (strcmp(ring_type, UT_RING_TYPE_DPDK_STR) == 0) {
+ return UT_RING_TYPE_DPDK;
+ } else if (strcmp(ring_type, UT_RING_TYPE_RMIND_STR) == 0) {
+ return UT_RING_TYPE_RMIND;
+ }
+
+ return UT_RING_TYPE_MAX;
+}
+
+char *ut_ring_type_enum2str(enum ut_ring_type ring_type) {
+ if (ring_type >= UT_RING_TYPE_MAX) {
+ return "unknown";
+ } else {
+ return ut_ring_type_map[ring_type];
+ }
+}
+
+int ut_setaffinity(int core_id) {
+ cpu_set_t mask;
+ CPU_ZERO(&mask);
+ CPU_SET(core_id, &mask);
+
+ if (pthread_setaffinity_np(pthread_self(), sizeof(mask), &mask) == -1) {
+ UT_ERR_LOG("pthread_setaffinity_np erro\n");
+ return BBQ_ERR;
+ }
+
+ return BBQ_OK;
+}
+
+void *ut_malloc_def_callback(int32_t socket_id __attribute__((unused)), size_t size) {
+ return malloc(size);
+ // return aligned_alloc(BBQ_CACHE_LINE, size);
+}
+
+void ut_free_def_callback(void *ptr,
+ size_t size __attribute__((unused))) {
+ free(ptr);
+}
+
+uint32_t ut_bbq_enqueue_burst(void *ring, void **obj_table, uint32_t n, uint16_t thread_idx, uint32_t *wait_consumed) {
+ UT_AVOID_WARNING(thread_idx);
+ return bbq_enqueue_burst(ring, obj_table, n, wait_consumed);
+}
+
+int ut_queue_init_bbq(struct ut_cfg *cfg, struct ut_queue *q) {
+#if 0
+ // 开启了BBQ_F_ENABLE_STAT 会导致性能下降
+ unsigned int flags = BBQ_F_RETRY_NEW | BBQ_F_ENABLE_STAT;
+#else
+ unsigned int flags = BBQ_F_RETRY_NEW;
+#endif
+
+ if (cfg->ring.producer_cnt <= 1) {
+ flags |= BBQ_F_SP_ENQ;
+ }
+
+ if (cfg->ring.consumer_cnt <= 1) {
+ flags |= BBQ_F_SC_DEQ;
+ }
+
+ if (cfg->ring.block_count == 0) {
+ q->ring = bbq_create("ut_bbq", cfg->ring.entries_cnt, BBQ_SOCKET_ID_ANY, flags,
+ ut_malloc_def_callback, ut_free_def_callback);
+ } else {
+ q->ring = bbq_create_with_bnbs("ut_bbq", cfg->ring.block_count,
+ cfg->ring.entries_cnt / cfg->ring.block_count,
+ BBQ_SOCKET_ID_ANY, flags, ut_malloc_def_callback, ut_free_def_callback);
+ }
+
+ if (q->ring == NULL) {
+ UT_ERR_LOG("bbq create queue failed");
+ return BBQ_ERR_INPUT_NULL;
+ }
+
+ q->ring_free_f = (ut_ring_free_f)bbq_destory;
+ q->enqueue_f = (ut_ring_enqueue_f)bbq_enqueue;
+ q->dequeue_f = (ut_ring_dequeue_f)bbq_dequeue;
+ q->enqueue_burst_f = (ut_enqueue_burst_f)ut_bbq_enqueue_burst;
+ q->dequeue_burst_f = (ut_dequeue_burst_f)bbq_dequeue_burst;
+ return 0;
+}
+
+void ut_queue_destory(struct ut_queue *q) {
+ if (q != NULL && q->ring_free_f != NULL) {
+ q->ring_free_f(q->ring);
+ }
+}
+
+bool ut_all_producer_exit(struct ut_info_s *ut_info) {
+ return atomic_load(&ut_info->ctl.producer_exit) == ut_info->cfg.ring.producer_cnt;
+}
+
+void ut_wait_all_threads_ready(struct ut_ctl *ctl) {
+ pthread_barrier_wait(&ctl->all_threads_start);
+ UT_DBG_LOG("thread init done!");
+}
+
+struct ut_exit_data *ut_exit_data_create(struct ut_thread_arg *t_arg) {
+ struct ut_exit_data *exit_data = (struct ut_exit_data *)ut_malloc(UT_MODULE_COMMON, sizeof(struct ut_exit_data));
+ if (exit_data == NULL) {
+ UT_ERR_LOG("malloc failed");
+ exit(-1);
+ }
+
+ size_t size = t_arg->info->cfg.ring.entries_cnt;
+ exit_data->simple_data_cnt = size;
+ exit_data->simple_data = ut_data_create(size, UT_DATA_MAGIC_TYPE);
+
+ if (exit_data->simple_data == NULL) {
+ UT_ERR_LOG("malloc failed");
+ exit(-1);
+ }
+ exit_data->arg = t_arg;
+ exit_data->thread_id = pthread_self();
+ exit_data->latency_ns = 0;
+ exit_data->data_error_cnt = 0;
+
+ return exit_data;
+}
+
+void ut_exit_data_destory(struct ut_exit_data *data) {
+ ut_data_destory(data->simple_data, data->simple_data_cnt);
+ ut_free(UT_MODULE_COMMON, data->arg);
+ ut_free(UT_MODULE_COMMON, data);
+}
+
+struct ut_data **ut_data_create(size_t cnt, enum ut_data_type data_type) {
+ struct ut_data **simple_data = ut_malloc(UT_MODULE_DATA, sizeof(*simple_data) * cnt);
+ struct ut_metric enqueue_time = ut_clock_time_get();
+ for (size_t i = 0; i < cnt; i++) {
+ simple_data[i] = ut_malloc(UT_MODULE_DATA, sizeof(*simple_data[i]));
+ if (data_type == UT_DATA_MAGIC_TYPE) {
+ simple_data[i]->data = UT_DATA_MAGIC;
+ } else {
+ simple_data[i]->data = (uintptr_t)(simple_data[i]);
+ }
+ simple_data[i]->enqueue_time = enqueue_time;
+ }
+
+ return simple_data;
+}
+
+void ut_data_destory(struct ut_data **data, size_t cnt) {
+ for (size_t i = 0; i < cnt; i++) {
+ ut_free(UT_MODULE_DATA, data[i]);
+ }
+ ut_free(UT_MODULE_DATA, data);
+}
+
+uint32_t ut_exec_enqueue(struct ut_queue *q, struct ut_data **data, size_t burst_cnt,
+ struct ut_metric *op_use_diff, uint16_t thread_idx) {
+ uint32_t enqueue_cnt = 0;
+ struct ut_metric op_use_start = ut_clock_time_get();
+ uint32_t wait_consumed = 0;
+ enqueue_cnt = q->enqueue_burst_f(q->ring, (void **)data, burst_cnt, thread_idx, &wait_consumed);
+ *op_use_diff = ut_clock_time_sub(ut_clock_time_get(), op_use_start);
+
+ return enqueue_cnt;
+}
+
+uint32_t ut_exec_dequeue(struct ut_queue *q, struct ut_data **data, size_t burst_cnt, struct ut_metric *op_use_diff) {
+ uint32_t dequeue_cnt = 0;
+
+ struct ut_metric op_use_start = ut_clock_time_get();
+ dequeue_cnt = q->dequeue_burst_f(q->ring, (void **)data, burst_cnt, NULL);
+ *op_use_diff = ut_clock_time_sub(ut_clock_time_get(), op_use_start);
+
+ return dequeue_cnt;
+}
+
+void *ut_thread_producer_start(void *arg) {
+ uint32_t enqueue_cnt = 0;
+ uint64_t ok_cnt = 0;
+ uint64_t run_times = 0;
+ struct ut_thread_arg *t_arg = (struct ut_thread_arg *)arg;
+ struct ut_info_s *info = t_arg->info;
+ struct ut_cfg *cfg = &info->cfg;
+ struct ut_queue *q = t_arg->q;
+ struct ut_exit_data *exit_data = ut_exit_data_create(t_arg);
+
+ char thread_name[128] = {0};
+ uint64_t op_ok_latency_ns = 0;
+ uint64_t op_err_latency_ns = 0;
+ uint64_t run_ok_times = cfg->run.run_ok_times / cfg->ring.producer_cnt;
+ struct ut_metric op_latency = {0};
+ snprintf(thread_name, sizeof(thread_name), "producer:%lu", exit_data->thread_id);
+ prctl(PR_SET_NAME, thread_name);
+ if (ut_setaffinity(t_arg->core) != BBQ_OK) {
+ UT_ERR_LOG("ut_setaffinity error");
+ exit(-1);
+ }
+
+ ut_wait_all_threads_ready(&info->ctl);
+ UT_INFO_LOG("producer thread:%lx, core:%d", exit_data->thread_id, t_arg->core);
+
+ exit_data->metric_start = ut_clock_time_get();
+ while (true) {
+ if ((run_ok_times > 0 && ok_cnt >= run_ok_times) || (!info->ctl.running)) {
+ // 控制次数的循环或运行时间到了
+ break;
+ }
+
+ if (cfg->ring.workload == UT_WORKLOAD_SIMPLE) {
+ enqueue_cnt = ut_exec_enqueue(q, exit_data->simple_data, cfg->ring.burst_cnt, &op_latency, t_arg->thread_idx);
+ } else {
+ struct ut_data **data = ut_data_create(cfg->ring.burst_cnt, UT_DATA_UINTPTR_TYPE);
+ if (data == NULL) {
+ UT_ERR_LOG("malloc falied");
+ exit(-1);
+ }
+
+ enqueue_cnt = ut_exec_enqueue(q, data, cfg->ring.burst_cnt, &op_latency, t_arg->thread_idx);
+ // 释放未入队的内存
+ for (uint32_t i = enqueue_cnt; i < cfg->ring.burst_cnt; i++) {
+ ut_free(UT_MODULE_DATA, data[i]);
+ }
+
+ ut_free(UT_MODULE_DATA, data);
+ }
+
+ if (enqueue_cnt > 0) {
+ ok_cnt += enqueue_cnt;
+ op_ok_latency_ns += ut_clock_time_to_ns(&op_latency);
+ } else {
+ op_err_latency_ns += ut_clock_time_to_ns(&op_latency);
+ }
+
+ run_times++;
+ }
+
+ exit_data->metric_end = ut_clock_time_get();
+ exit_data->run_times = run_times;
+ exit_data->ok_cnt = ok_cnt;
+
+ exit_data->op_ok_latency_ns = op_ok_latency_ns;
+ exit_data->op_err_latency_ns = op_err_latency_ns;
+ atomic_fetch_add(&info->ctl.producer_exit, 1);
+
+ UT_DBG_LOG("producer-----> en_ok:%lu", ok_cnt);
+ pthread_exit(exit_data);
+}
+
+void *ut_thread_consumer_start(void *arg) {
+ uint32_t deq_cnt = -1;
+ uint64_t ok_cnt = 0;
+ uint64_t run_times = 0;
+ struct ut_thread_arg *t_arg = (struct ut_thread_arg *)arg;
+ struct ut_info_s *info = t_arg->info;
+ struct ut_cfg *cfg = &info->cfg;
+ struct ut_queue *q = t_arg->q;
+ struct ut_exit_data *exit_data = ut_exit_data_create(t_arg);
+ uint64_t latency_ns = 0;
+ struct ut_metric op_latency = {0};
+ uint64_t op_ok_latency_ns = 0;
+ uint64_t op_err_latency_ns = 0;
+ uint64_t data_error_cnt = 0;
+ char thread_name[128] = {0};
+ struct ut_data **deq_data = ut_malloc(UT_MODULE_DATA, sizeof(*deq_data) * cfg->ring.entries_cnt);
+
+ snprintf(thread_name, sizeof(thread_name), "consumer:%lu", exit_data->thread_id);
+ prctl(PR_SET_NAME, thread_name);
+ if (ut_setaffinity(t_arg->core) != BBQ_OK) {
+ UT_ERR_LOG("ut_setaffinity error");
+ exit(-1);
+ }
+
+ ut_wait_all_threads_ready(&info->ctl);
+ UT_INFO_LOG("consumer thread:%lx, core:%d", exit_data->thread_id, t_arg->core);
+
+ exit_data->metric_start = ut_clock_time_get();
+
+ while (true) {
+ if (ut_all_producer_exit(info) && deq_cnt == 0) {
+ // 运行时间到了或是所有生产者退出了,检查生产者是否全部退出,且队列被消费完了
+ break;
+ }
+
+ deq_cnt = ut_exec_dequeue(q, deq_data, cfg->ring.burst_cnt, &op_latency);
+ if (deq_cnt > 0) {
+ for (uint32_t i = 0; i < deq_cnt; i++) {
+ struct ut_data *data = deq_data[i];
+ if (cfg->ring.workload == UT_WORKLOAD_SIMPLE) {
+ if (data->data != UT_DATA_MAGIC) {
+ UT_ERR_LOG("the obtained data is not consistent with the expectation, expect:%u actual:%lu", UT_DATA_MAGIC, data->data);
+ exit_data->data_error_cnt += 1;
+ }
+ } else {
+ struct ut_metric latency = ut_clock_time_sub(ut_clock_time_get(), data->enqueue_time);
+ if (ut_clock_time_is_zero(&data->enqueue_time)) {
+ UT_ERR_LOG("enqueue_time is 0");
+ exit(-1);
+ }
+
+ if (data->data != (uintptr_t)data) {
+ UT_ERR_LOG("the obtained data is not consistent with the expectation, expect:%lu actual:%lu", (uintptr_t)data, data->data);
+ data_error_cnt += 1;
+ }
+
+ latency_ns += ut_clock_time_to_ns(&latency);
+ ut_free(UT_MODULE_DATA, data);
+ }
+ }
+ ok_cnt += deq_cnt;
+ op_ok_latency_ns += ut_clock_time_to_ns(&op_latency);
+ } else {
+ op_err_latency_ns += ut_clock_time_to_ns(&op_latency);
+ }
+
+ run_times++;
+ }
+
+ exit_data->metric_end = ut_clock_time_get();
+ exit_data->run_times = run_times;
+ exit_data->ok_cnt = ok_cnt;
+ exit_data->latency_ns = latency_ns;
+ exit_data->op_ok_latency_ns = op_ok_latency_ns;
+ exit_data->op_err_latency_ns = op_err_latency_ns;
+ exit_data->data_error_cnt = data_error_cnt;
+
+ ut_free(UT_MODULE_DATA, deq_data);
+ UT_DBG_LOG("consumer-----> de_ok:%lu", ok_cnt);
+ pthread_exit(exit_data);
+}
+
+void ut_wait_all_threads_exit(struct ut_info_s *info, uint32_t thread_cnt, pthread_t *threads, struct ut_exit_data **exit_data) {
+ if (info->cfg.run.run_time > 0) {
+ UT_DBG_LOG("sleep %lus, and notify all threads to exit...", info->cfg.run.run_time);
+ sleep(info->cfg.run.run_time);
+ info->ctl.running = false;
+ }
+
+ for (uint32_t i = 0; i < thread_cnt; i++) {
+ pthread_join(threads[i], (void **)(&exit_data[i])); // 等待每个线程结束
+ }
+}
+void ut_one_thread_create(struct ut_info_s *info, struct ut_queue *q, enum ut_thread_type ttype, int core, uint16_t thread_id, pthread_t *thread) {
+ UT_DBG_LOG("thread type:%d core:%d", ttype, core);
+ struct ut_thread_arg *arg = (struct ut_thread_arg *)ut_malloc(UT_MODULE_COMMON, sizeof(struct ut_thread_arg)); // 线程回收时free
+ arg->info = info;
+ arg->q = q;
+ arg->ttype = ttype;
+ arg->core = core;
+ arg->thread_idx = thread_id;
+
+ if (ttype == UT_THREAD_PRODUCER) {
+ pthread_create(thread, NULL, ut_thread_producer_start, arg);
+ } else {
+ pthread_create(thread, NULL, ut_thread_consumer_start, arg);
+ }
+}
+
+#define CORE_ID_CHK_SET(core_id, max_id) \
+ do { \
+ core_id = (core_id + 1) <= max_id ? (core_id + 1) : core_id; \
+ } while (0)
+
+pthread_t *ut_threads_create(struct ut_info_s *info, struct ut_queue *q) {
+ // 创建生产者消费者线程
+ uint16_t thread_id = 0;
+ struct ut_cfg *cfg = &info->cfg;
+ int core_id = cfg->base.core_begin;
+ size_t thread_cnt = cfg->ring.producer_cnt + cfg->ring.consumer_cnt;
+ pthread_t *threads = (pthread_t *)ut_malloc(UT_MODULE_COMMON, sizeof(pthread_t) * thread_cnt); // 存储所有线程ID的数组
+
+ pthread_barrier_init(&info->ctl.all_threads_start, NULL, thread_cnt);
+ info->ctl.running = true;
+ info->ctl.producer_exit = ATOMIC_VAR_INIT(0);
+
+ // MPSC 或 SPMC 场景在第一个核心/超线程上分配单个生产者或消费者,然后将其他线程按顺序分配给核心/超线程。
+ // MPMC,我们将生产者和消费者一一交错分配
+ // 如果数量不同,则在最后分配剩余部分。
+ if (cfg->ring.producer_cnt == 1 && cfg->ring.consumer_cnt >= 1) {
+ // SPMC,第一个核心给生产者,其他分配给消费者
+ ut_one_thread_create(info, q, UT_THREAD_PRODUCER, core_id, thread_id, &(threads[thread_id]));
+ thread_id++;
+ for (uint32_t i = 0; i < cfg->ring.consumer_cnt; i++) {
+ CORE_ID_CHK_SET(core_id, cfg->base.core_end);
+ ut_one_thread_create(info, q, UT_THREAD_CONSUMER, core_id, thread_id, &(threads[thread_id]));
+ thread_id++;
+ }
+ } else if (cfg->ring.consumer_cnt == 1 && cfg->ring.producer_cnt >= 1) {
+ // MPSC,第一个核心给消费者,其他分配给生产者
+ ut_one_thread_create(info, q, UT_THREAD_CONSUMER, core_id, thread_id, &(threads[thread_id]));
+ thread_id++;
+ for (uint32_t i = 0; i < cfg->ring.producer_cnt; i++) {
+ CORE_ID_CHK_SET(core_id, cfg->base.core_end);
+ ut_one_thread_create(info, q, UT_THREAD_PRODUCER, core_id, thread_id, &(threads[thread_id]));
+ thread_id++;
+ }
+ } else {
+ // MPMC 或 只有生产者 或这有消费者,核心交错分配
+ uint32_t pcnt = cfg->ring.producer_cnt; // 生产者个数
+ uint32_t ccnt = cfg->ring.consumer_cnt; // 消费者个数
+ for (core_id = cfg->base.core_begin; core_id < cfg->base.core_end && pcnt > 0 && ccnt > 0;) {
+ if ((core_id & 1) == 0) {
+ // 偶数
+ ut_one_thread_create(info, q, UT_THREAD_PRODUCER, core_id, thread_id, &(threads[thread_id]));
+ thread_id++;
+ pcnt--;
+ } else {
+ ut_one_thread_create(info, q, UT_THREAD_CONSUMER, core_id, thread_id, &(threads[thread_id]));
+ thread_id++;
+ ccnt--;
+ }
+ CORE_ID_CHK_SET(core_id, cfg->base.core_end);
+ }
+
+ for (uint32_t i = 0; i < pcnt; i++) {
+ ut_one_thread_create(info, q, UT_THREAD_PRODUCER, core_id, thread_id, &(threads[thread_id]));
+ thread_id++;
+ CORE_ID_CHK_SET(core_id, cfg->base.core_end);
+ }
+
+ for (uint32_t i = 0; i < ccnt; i++) {
+ ut_one_thread_create(info, q, UT_THREAD_CONSUMER, core_id, thread_id, &(threads[thread_id]));
+ thread_id++;
+ CORE_ID_CHK_SET(core_id, cfg->base.core_end);
+ }
+ }
+
+ return threads;
+}
+
+void ut_threads_destory(struct ut_info_s *info, pthread_t *threads) {
+ pthread_barrier_destroy(&info->ctl.all_threads_start);
+ ut_free(UT_MODULE_COMMON, threads);
+}
+
+void ut_merge_data_detail(struct ut_merge_data *merge, struct ut_exit_data *exit_data) {
+ merge->run_times += exit_data->run_times;
+ merge->ok_cnt += exit_data->ok_cnt;
+ merge->latency_ns += exit_data->latency_ns;
+ merge->op_err_latency_ns = exit_data->op_err_latency_ns;
+ merge->op_ok_latency_ns += exit_data->op_ok_latency_ns;
+ merge->data_error_cnt += exit_data->data_error_cnt;
+}
+
+void ut_merge_all_data(struct ut_exit_data **exit_data, uint32_t thread_cnt, struct ut_merge_s *merge) {
+ struct ut_metric p_start = {0};
+ struct ut_metric p_end = {0};
+ struct ut_metric c_start = {0};
+ struct ut_metric c_end = {0};
+
+ for (uint32_t i = 0; i < thread_cnt; i++) {
+ // 根据生产者/消费者 线程最早开始和最晚结束,记录时间
+ if (exit_data[i]->arg->ttype == UT_THREAD_PRODUCER) {
+ if (ut_clock_time_is_zero(&p_start) || ut_timespec_is_after(&p_start.timestamp, &exit_data[i]->metric_start.timestamp)) {
+ p_start = exit_data[i]->metric_start;
+ }
+
+ if (ut_timespec_is_after(&exit_data[i]->metric_start.timestamp, &p_end.timestamp)) {
+ p_end = exit_data[i]->metric_end;
+ }
+
+ ut_merge_data_detail(&merge->producer, exit_data[i]);
+ } else {
+ if (ut_clock_time_is_zero(&c_start) || ut_timespec_is_after(&c_start.timestamp, &exit_data[i]->metric_start.timestamp)) {
+ c_start = exit_data[i]->metric_start;
+ }
+
+ if (ut_timespec_is_after(&exit_data[i]->metric_start.timestamp, &c_end.timestamp)) {
+ c_end = exit_data[i]->metric_end;
+ }
+
+ ut_merge_data_detail(&merge->consumer, exit_data[i]);
+ }
+ }
+
+ merge->producer.use_time = ut_clock_time_sub(p_end, p_start);
+ merge->consumer.use_time = ut_clock_time_sub(c_end, c_start);
+} \ No newline at end of file