diff options
| author | zhengchao <[email protected]> | 2022-01-16 21:06:13 +0500 |
|---|---|---|
| committer | 郑超 <[email protected]> | 2022-01-24 14:53:45 +0000 |
| commit | 5355c60af7f33d4733722689fdba97055d73d582 (patch) | |
| tree | 1b57c5d818f67c1423fbd1c7a1d39519fad5005d | |
| parent | a0e9dba33fe250ae6ae6ddd613cca0e8ec74cf2a (diff) | |
为每个线程分配独立的计数器,以提升FS_operate多线程性能。其中使用thread local storage存储计数器下标。v2.10.0
| -rw-r--r-- | src/MESA_field_stat.cpp | 24 | ||||
| -rw-r--r-- | src/field_stat_internal.h | 23 | ||||
| -rw-r--r-- | src/threadsafe_counter.h | 76 | ||||
| -rw-r--r-- | test/fs2_test.cpp | 145 |
4 files changed, 183 insertions, 85 deletions
diff --git a/src/MESA_field_stat.cpp b/src/MESA_field_stat.cpp index c576547..6528a94 100644 --- a/src/MESA_field_stat.cpp +++ b/src/MESA_field_stat.cpp @@ -1,5 +1,6 @@ #include "field_stat2.h" #include "field_stat_internal.h" +#include "threadsafe_counter.h" #include "hdr_histogram.h" #include "cJSON.h" @@ -789,13 +790,13 @@ int FS_operate(screen_stat_handle_t handle,int id,int column_id,enum field_op op switch(op) { case FS_OP_ADD: - atomic_add(&(target->changing), value); + threadsafe_counter_add(&(target->changing), value); break; case FS_OP_SET: - atomic_set(&(target->changing), value-target->accumulated); + threadsafe_counter_set(&(target->changing), value-target->accumulated); break; case FS_OP_SUB: - atomic_sub(&(target->changing), value); + threadsafe_counter_sub(&(target->changing), value); break; default: assert(0); @@ -821,12 +822,12 @@ long long get_stat_unit_val(display_manifest_t* p, int column_seq,enum field_cal default: break; } - value= atomic_read(&(target->changing)); + value= threadsafe_counter_read(&(target->changing)); if(is_refer==0) { target->previous_changed=value; target->accumulated+=value; - atomic_set(&(target->changing), 0); + threadsafe_counter_set(&(target->changing), 0); } switch(calc_type) { @@ -1322,7 +1323,7 @@ static int output_style_histogram(struct FS_space_t* _handle, long long interval { pos--; } - pos+=snprintf(pos,size-(pos-print_buf),"\n%s\n",draw_line); + pos+=snprintf(pos,size-(pos-print_buf),"\n%s\n", draw_line); } return pos-print_buf; } @@ -1360,7 +1361,8 @@ cJSON *fs2_metrics_to_json(FS_space_t* _handle, long long timestamp) cJSON_AddStringToObject(tmp_obj, "name", p->name); cJSON_AddStringToObject(tmp_obj, "type", "single"); cJSON_AddNumberToObject(tmp_obj, "acc", p->single.accumulated); - cJSON_AddNumberToObject(tmp_obj, "diff", p->single.changing); + value=threadsafe_counter_read(&p->single.changing); + cJSON_AddNumberToObject(tmp_obj, "diff", value); cJSON_AddItemToArray(metrics_array_obj, tmp_obj); //cJSON_Delete(tmp_obj); break; @@ -1371,8 +1373,9 @@ cJSON *fs2_metrics_to_json(FS_space_t* _handle, long long timestamp) tmp_obj = cJSON_CreateObject(); cJSON_AddStringToObject(tmp_obj, "name", p->name); cJSON_AddStringToObject(tmp_obj, "type", "single"); - cJSON_AddNumberToObject(tmp_obj, "acc", p->single.accumulated); - cJSON_AddNumberToObject(tmp_obj, "diff", p->single.changing); + cJSON_AddNumberToObject(tmp_obj, "acc", p->single.accumulated); + value=threadsafe_counter_read(&p->single.changing); + cJSON_AddNumberToObject(tmp_obj, "diff", value); cJSON_AddItemToArray(metrics_array_obj, tmp_obj); //cJSON_Delete(tmp_obj); break; @@ -1390,7 +1393,8 @@ cJSON *fs2_metrics_to_json(FS_space_t* _handle, long long timestamp) cJSON_AddStringToObject(tmp_obj, "name", tmp_output_name); //cJSON_AddStringToObject(tmp_obj, "type", "line"); cJSON_AddNumberToObject(tmp_obj, "acc", p_column->single.accumulated); - cJSON_AddNumberToObject(tmp_obj, "diff", p_column->single.changing); + value=threadsafe_counter_read(&p->single.changing); + cJSON_AddNumberToObject(tmp_obj, "diff", value); cJSON_AddItemToArray(metrics_array_obj, tmp_obj); //cJSON_Delete(tmp_obj); } diff --git a/src/field_stat_internal.h b/src/field_stat_internal.h index b5fdc22..5dbeb2e 100644 --- a/src/field_stat_internal.h +++ b/src/field_stat_internal.h @@ -3,26 +3,7 @@ #include <pthread.h> #include "hdr_histogram.h" - -#if(__GNUC__ * 100 + __GNUC_MINOR__ * 10 + __GNUC_PATCHLEVEL__ >= 410) -#define atomic_inc(x) __sync_add_and_fetch((x),1) -#define atomic_dec(x) __sync_sub_and_fetch((x),1) -#define atomic_add(x,y) __sync_add_and_fetch((x),(y)) -#define atomic_sub(x,y) __sync_sub_and_fetch((x),(y)) -typedef long atomic_t; -#define ATOMIC_INIT(i) { (i) } -#define atomic_read(x) __sync_add_and_fetch((x),0) -#define atomic_set(x,y) __sync_lock_test_and_set((x),y) -#else -typedef long atomic_t; -#define atomic_inc(x) ((*(x))++) -#define atomic_dec(x) ((*(x))--) -#define atomic_add(x,y) ((*(x))+=(y)) -#define atomic_sub(x,y) ((*(x))-=(y)) -#define ATOMIC_INIT(i) { (i) } -#define atomic_read(x) (*(x)) -#define atomic_set(x,y) ((*(x))=(y)) -#endif +#include "threadsafe_counter.h" #define INIT_STAT_FIELD_NUM 1024 #define MAX_STAT_COLUMN_NUM 64 @@ -42,7 +23,7 @@ typedef long atomic_t; struct stat_unit_t { - long long changing; + struct threadsafe_counter changing; long long accumulated; long long previous_changed; }; diff --git a/src/threadsafe_counter.h b/src/threadsafe_counter.h new file mode 100644 index 0000000..8748764 --- /dev/null +++ b/src/threadsafe_counter.h @@ -0,0 +1,76 @@ +#pragma once +#include <stdlib.h> +#include <pthread.h> +#define SLOTS_COUNT 256 +#define CPU_CACHE_ALIGMENT 64 + +#if(__GNUC__ * 100 + __GNUC_MINOR__ * 10 + __GNUC_PATCHLEVEL__ >= 410) +#define atomic_inc(x) __sync_add_and_fetch((x),1) +#define atomic_dec(x) __sync_sub_and_fetch((x),1) +#define atomic_add(x,y) __sync_add_and_fetch((x),(y)) +#define atomic_sub(x,y) __sync_sub_and_fetch((x),(y)) +typedef long atomic_t; +#define ATOMIC_INIT(i) { (i) } +#define atomic_read(x) __sync_add_and_fetch((x),0) +#define atomic_set(x,y) __sync_lock_test_and_set((x),y) +#else +typedef long atomic_t; +#define atomic_inc(x) ((*(x))++) +#define atomic_dec(x) ((*(x))--) +#define atomic_add(x,y) ((*(x))+=(y)) +#define atomic_sub(x,y) ((*(x))-=(y)) +#define ATOMIC_INIT(i) { (i) } +#define atomic_read(x) (*(x)) +#define atomic_set(x,y) ((*(x))=(y)) +#endif + +struct threadsafe_counter +{ + long long lcounters[(CPU_CACHE_ALIGMENT/sizeof(long long))*SLOTS_COUNT]; +}; +static __thread long slot_id=-1; +static __thread pid_t thread_id=0; +inline int get_lcounter_offset() +{ + int offset=0; + if(slot_id<0) + { + thread_id=pthread_self(); + slot_id=random()%SLOTS_COUNT; + } + offset=(CPU_CACHE_ALIGMENT/sizeof(long long))*slot_id; + return offset; +} +inline void threadsafe_counter_add(struct threadsafe_counter* c, long long value) +{ + int offset=get_lcounter_offset(); + atomic_add(&(c->lcounters[offset]), value); +} +inline void threadsafe_counter_sub(struct threadsafe_counter* c, long long value) +{ + int offset=get_lcounter_offset(); + atomic_sub(&(c->lcounters[offset]), value); +} + +inline long long threadsafe_counter_read(struct threadsafe_counter* c) +{ + int i=0, offset=0; + long long value=0; + for(i=0; i<SLOTS_COUNT; i++) + { + offset=i*CPU_CACHE_ALIGMENT/sizeof(long long); + value+=atomic_read(&(c->lcounters[offset])); + } + return value; +} +inline void threadsafe_counter_set(struct threadsafe_counter* c, long long value) +{ + int i=0, offset=0; + for(i=0; i<SLOTS_COUNT; i++) + { + offset=i*CPU_CACHE_ALIGMENT/sizeof(long long); + atomic_set(&(c->lcounters[offset]), 0); + } + atomic_set(&(c->lcounters[0]), value); +} + diff --git a/test/fs2_test.cpp b/test/fs2_test.cpp index 610db3a..23db6df 100644 --- a/test/fs2_test.cpp +++ b/test/fs2_test.cpp @@ -4,6 +4,7 @@ #include <stdlib.h> #include <string.h> #include <unistd.h> +#include <pthread.h> #define TEST_STATUS_NUM 4 #define TEST_FIELD_NUM 9 @@ -14,14 +15,87 @@ #define TEST_RUNTIME_REG_NUM 32 #define TEST_RUNTIME_REG_LINE_NUM 6 +int status_ids[TEST_STATUS_NUM], field_ids[TEST_FIELD_NUM], line_ids[TEST_LINE_NUM + TEST_RUNTIME_REG_LINE_NUM], column_ids[TEST_COLUMN_NUM]; +int histogram_ids[TEST_HISTOGRAM_NUM]; +int runtime_status_ids[TEST_RUNTIME_REG_NUM]; +int runtime_reg_num = 0, runtime_reg_line_num = 0; + +struct thread_para +{ + int loops; + screen_stat_handle_t handle; + int thread_id; +}; +static void* worker_thread(void* arg) +{ + + struct thread_para* para=(struct thread_para*)arg; + int loops = para->loops, i=0, j=0; + screen_stat_handle_t handle=para->handle; + char buff[128]; + int ret=0; + while (loops > 0) + { + loops--; + for (i = 0; i < TEST_STATUS_NUM; i++) + { + FS_operate(handle, status_ids[i], 0, FS_OP_SET, i * 10); + } + for (i = 0; i < TEST_FIELD_NUM; i++) + { + FS_operate(handle, field_ids[i], 0, FS_OP_ADD, i * 100); + } + for (i = 0; i < TEST_LINE_NUM + runtime_reg_line_num; i++) + { + for (j = 0; j < TEST_COLUMN_NUM; j++) + { + FS_operate(handle, line_ids[i], column_ids[j], FS_OP_ADD, (j + 1) * 30); + } + } + for (i = 0; i < runtime_reg_num; i++) + { + FS_operate(handle, runtime_status_ids[i], 0, FS_OP_ADD, i * 1000); + } + if (runtime_reg_num < TEST_RUNTIME_REG_NUM) + { + snprintf(buff, sizeof(buff), "rt_reg_%02d", runtime_reg_num); + ret = FS_register(handle, FS_STYLE_STATUS, FS_CALC_SPEED, buff); + assert(ret >= 0); + runtime_status_ids[runtime_reg_num] = ret; + runtime_reg_num++; + ret = FS_register(handle, FS_STYLE_COLUMN, FS_CALC_SPEED, buff); + assert(ret == -1); //always failed + } + if (runtime_reg_line_num < TEST_RUNTIME_REG_LINE_NUM) + { + snprintf(buff, sizeof(buff), "line_rt_%02d", runtime_reg_line_num); + ret = FS_register(handle, FS_STYLE_LINE, FS_CALC_SPEED, buff); + assert(ret >= 0); + line_ids[TEST_LINE_NUM + runtime_reg_line_num] = ret; + runtime_reg_line_num++; + } + long long preset[] = {1, 10, 20, 30, 40, 200, 300, 400, 600, 1000, 2000, 4000, 5000, 8000, 100000}; + for (i = 0; i < TEST_HISTOGRAM_NUM; i++) + { + for (j = 0; (size_t)j < sizeof(preset) / sizeof(long long); j++) + { + FS_operate(handle, histogram_ids[i], 0, FS_OP_SET, preset[j]); + } + } + sleep(1); + } + return NULL; +} + int main(int argc, char *argv[]) { screen_stat_handle_t handle = NULL; const char *stat_path = "./fs2_test.status"; const char *app_name = "fs2_test"; char buff[128]; - int value = 0, i = 0, j = 0, runtime_reg_num = 0, runtime_reg_line_num = 0, ret = 0; - int loops = 10; + int value = 0, i = 0; + + srand(171); unsigned short port = 9001; FS_library_set_prometheus_port(port); @@ -63,9 +137,7 @@ int main(int argc, char *argv[]) const char *histogram_format = "0.1,0.5,0.8,0.9,0.95,0.99"; FS_set_para(handle, HISTOGRAM_GLOBAL_BINS, histogram_format, strlen(histogram_format) + 1); - int status_ids[TEST_STATUS_NUM], field_ids[TEST_FIELD_NUM], line_ids[TEST_LINE_NUM + TEST_RUNTIME_REG_LINE_NUM], column_ids[TEST_COLUMN_NUM]; - int histogram_ids[TEST_HISTOGRAM_NUM]; - int runtime_status_ids[TEST_RUNTIME_REG_NUM]; + for (i = 0; i < TEST_STATUS_NUM; i++) { snprintf(buff, sizeof(buff), "(status_%02d)/\\-,;%%$*", i); @@ -112,57 +184,22 @@ int main(int argc, char *argv[]) FS_set_para(handle, NOT_SEND_METRIC_TO_SERVER, &value, sizeof(value)); FS_start(handle); - loops = 10; - while (loops > 0) + int thread_num=16; + pthread_t threads[thread_num]; + struct thread_para para; + para.loops=10; + para.handle=handle; + for(i=0; i<thread_num; i++) { - loops--; - for (i = 0; i < TEST_STATUS_NUM; i++) - { - FS_operate(handle, status_ids[i], 0, FS_OP_SET, i * 10); - } - for (i = 0; i < TEST_FIELD_NUM; i++) - { - FS_operate(handle, field_ids[i], 0, FS_OP_ADD, i * 100); - } - for (i = 0; i < TEST_LINE_NUM + runtime_reg_line_num; i++) - { - for (j = 0; j < TEST_COLUMN_NUM; j++) - { - FS_operate(handle, line_ids[i], column_ids[j], FS_OP_ADD, (j + 1) * 30); - } - } - for (i = 0; i < runtime_reg_num; i++) - { - FS_operate(handle, runtime_status_ids[i], 0, FS_OP_ADD, i * 1000); - } - if (runtime_reg_num < TEST_RUNTIME_REG_NUM) - { - snprintf(buff, sizeof(buff), "rt_reg_%02d", runtime_reg_num); - ret = FS_register(handle, FS_STYLE_STATUS, FS_CALC_SPEED, buff); - assert(ret >= 0); - runtime_status_ids[runtime_reg_num] = ret; - runtime_reg_num++; - ret = FS_register(handle, FS_STYLE_COLUMN, FS_CALC_SPEED, buff); - assert(ret == -1); //always failed - } - if (runtime_reg_line_num < TEST_RUNTIME_REG_LINE_NUM) - { - snprintf(buff, sizeof(buff), "line_rt_%02d", runtime_reg_line_num); - ret = FS_register(handle, FS_STYLE_LINE, FS_CALC_SPEED, buff); - assert(ret >= 0); - line_ids[TEST_LINE_NUM + runtime_reg_line_num] = ret; - runtime_reg_line_num++; - } - long long preset[] = {1, 10, 20, 30, 40, 200, 300, 400, 600, 1000, 2000, 4000, 5000, 8000, 100000}; - for (i = 0; i < TEST_HISTOGRAM_NUM; i++) - { - for (j = 0; (size_t)j < sizeof(preset) / sizeof(long long); j++) - { - FS_operate(handle, histogram_ids[i], 0, FS_OP_SET, preset[j]); - } - } - sleep(1); + pthread_create(&(threads[i]), NULL, worker_thread, ¶); + } + void *temp; + for(i=0; i<thread_num; i++) + { + pthread_join(threads[i], (void**)&temp); } + + FS_stop(&handle); printf("fs2 stoped cnt = %d, will sleep 1s\n", repeat_cnt); sleep(1); |
