summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzhengchao <[email protected]>2022-01-16 21:06:13 +0500
committer郑超 <[email protected]>2022-01-24 14:53:45 +0000
commit5355c60af7f33d4733722689fdba97055d73d582 (patch)
tree1b57c5d818f67c1423fbd1c7a1d39519fad5005d
parenta0e9dba33fe250ae6ae6ddd613cca0e8ec74cf2a (diff)
为每个线程分配独立的计数器,以提升FS_operate多线程性能。其中使用thread local storage存储计数器下标。v2.10.0
-rw-r--r--src/MESA_field_stat.cpp24
-rw-r--r--src/field_stat_internal.h23
-rw-r--r--src/threadsafe_counter.h76
-rw-r--r--test/fs2_test.cpp145
4 files changed, 183 insertions, 85 deletions
diff --git a/src/MESA_field_stat.cpp b/src/MESA_field_stat.cpp
index c576547..6528a94 100644
--- a/src/MESA_field_stat.cpp
+++ b/src/MESA_field_stat.cpp
@@ -1,5 +1,6 @@
#include "field_stat2.h"
#include "field_stat_internal.h"
+#include "threadsafe_counter.h"
#include "hdr_histogram.h"
#include "cJSON.h"
@@ -789,13 +790,13 @@ int FS_operate(screen_stat_handle_t handle,int id,int column_id,enum field_op op
switch(op)
{
case FS_OP_ADD:
- atomic_add(&(target->changing), value);
+ threadsafe_counter_add(&(target->changing), value);
break;
case FS_OP_SET:
- atomic_set(&(target->changing), value-target->accumulated);
+ threadsafe_counter_set(&(target->changing), value-target->accumulated);
break;
case FS_OP_SUB:
- atomic_sub(&(target->changing), value);
+ threadsafe_counter_sub(&(target->changing), value);
break;
default:
assert(0);
@@ -821,12 +822,12 @@ long long get_stat_unit_val(display_manifest_t* p, int column_seq,enum field_cal
default:
break;
}
- value= atomic_read(&(target->changing));
+ value= threadsafe_counter_read(&(target->changing));
if(is_refer==0)
{
target->previous_changed=value;
target->accumulated+=value;
- atomic_set(&(target->changing), 0);
+ threadsafe_counter_set(&(target->changing), 0);
}
switch(calc_type)
{
@@ -1322,7 +1323,7 @@ static int output_style_histogram(struct FS_space_t* _handle, long long interval
{
pos--;
}
- pos+=snprintf(pos,size-(pos-print_buf),"\n%s\n",draw_line);
+ pos+=snprintf(pos,size-(pos-print_buf),"\n%s\n", draw_line);
}
return pos-print_buf;
}
@@ -1360,7 +1361,8 @@ cJSON *fs2_metrics_to_json(FS_space_t* _handle, long long timestamp)
cJSON_AddStringToObject(tmp_obj, "name", p->name);
cJSON_AddStringToObject(tmp_obj, "type", "single");
cJSON_AddNumberToObject(tmp_obj, "acc", p->single.accumulated);
- cJSON_AddNumberToObject(tmp_obj, "diff", p->single.changing);
+ value=threadsafe_counter_read(&p->single.changing);
+ cJSON_AddNumberToObject(tmp_obj, "diff", value);
cJSON_AddItemToArray(metrics_array_obj, tmp_obj);
//cJSON_Delete(tmp_obj);
break;
@@ -1371,8 +1373,9 @@ cJSON *fs2_metrics_to_json(FS_space_t* _handle, long long timestamp)
tmp_obj = cJSON_CreateObject();
cJSON_AddStringToObject(tmp_obj, "name", p->name);
cJSON_AddStringToObject(tmp_obj, "type", "single");
- cJSON_AddNumberToObject(tmp_obj, "acc", p->single.accumulated);
- cJSON_AddNumberToObject(tmp_obj, "diff", p->single.changing);
+ cJSON_AddNumberToObject(tmp_obj, "acc", p->single.accumulated);
+ value=threadsafe_counter_read(&p->single.changing);
+ cJSON_AddNumberToObject(tmp_obj, "diff", value);
cJSON_AddItemToArray(metrics_array_obj, tmp_obj);
//cJSON_Delete(tmp_obj);
break;
@@ -1390,7 +1393,8 @@ cJSON *fs2_metrics_to_json(FS_space_t* _handle, long long timestamp)
cJSON_AddStringToObject(tmp_obj, "name", tmp_output_name);
//cJSON_AddStringToObject(tmp_obj, "type", "line");
cJSON_AddNumberToObject(tmp_obj, "acc", p_column->single.accumulated);
- cJSON_AddNumberToObject(tmp_obj, "diff", p_column->single.changing);
+ value=threadsafe_counter_read(&p->single.changing);
+ cJSON_AddNumberToObject(tmp_obj, "diff", value);
cJSON_AddItemToArray(metrics_array_obj, tmp_obj);
//cJSON_Delete(tmp_obj);
}
diff --git a/src/field_stat_internal.h b/src/field_stat_internal.h
index b5fdc22..5dbeb2e 100644
--- a/src/field_stat_internal.h
+++ b/src/field_stat_internal.h
@@ -3,26 +3,7 @@
#include <pthread.h>
#include "hdr_histogram.h"
-
-#if(__GNUC__ * 100 + __GNUC_MINOR__ * 10 + __GNUC_PATCHLEVEL__ >= 410)
-#define atomic_inc(x) __sync_add_and_fetch((x),1)
-#define atomic_dec(x) __sync_sub_and_fetch((x),1)
-#define atomic_add(x,y) __sync_add_and_fetch((x),(y))
-#define atomic_sub(x,y) __sync_sub_and_fetch((x),(y))
-typedef long atomic_t;
-#define ATOMIC_INIT(i) { (i) }
-#define atomic_read(x) __sync_add_and_fetch((x),0)
-#define atomic_set(x,y) __sync_lock_test_and_set((x),y)
-#else
-typedef long atomic_t;
-#define atomic_inc(x) ((*(x))++)
-#define atomic_dec(x) ((*(x))--)
-#define atomic_add(x,y) ((*(x))+=(y))
-#define atomic_sub(x,y) ((*(x))-=(y))
-#define ATOMIC_INIT(i) { (i) }
-#define atomic_read(x) (*(x))
-#define atomic_set(x,y) ((*(x))=(y))
-#endif
+#include "threadsafe_counter.h"
#define INIT_STAT_FIELD_NUM 1024
#define MAX_STAT_COLUMN_NUM 64
@@ -42,7 +23,7 @@ typedef long atomic_t;
struct stat_unit_t
{
- long long changing;
+ struct threadsafe_counter changing;
long long accumulated;
long long previous_changed;
};
diff --git a/src/threadsafe_counter.h b/src/threadsafe_counter.h
new file mode 100644
index 0000000..8748764
--- /dev/null
+++ b/src/threadsafe_counter.h
@@ -0,0 +1,76 @@
+#pragma once
+#include <stdlib.h>
+#include <pthread.h>
+#define SLOTS_COUNT 256
+#define CPU_CACHE_ALIGMENT 64
+
+#if(__GNUC__ * 100 + __GNUC_MINOR__ * 10 + __GNUC_PATCHLEVEL__ >= 410)
+#define atomic_inc(x) __sync_add_and_fetch((x),1)
+#define atomic_dec(x) __sync_sub_and_fetch((x),1)
+#define atomic_add(x,y) __sync_add_and_fetch((x),(y))
+#define atomic_sub(x,y) __sync_sub_and_fetch((x),(y))
+typedef long atomic_t;
+#define ATOMIC_INIT(i) { (i) }
+#define atomic_read(x) __sync_add_and_fetch((x),0)
+#define atomic_set(x,y) __sync_lock_test_and_set((x),y)
+#else
+typedef long atomic_t;
+#define atomic_inc(x) ((*(x))++)
+#define atomic_dec(x) ((*(x))--)
+#define atomic_add(x,y) ((*(x))+=(y))
+#define atomic_sub(x,y) ((*(x))-=(y))
+#define ATOMIC_INIT(i) { (i) }
+#define atomic_read(x) (*(x))
+#define atomic_set(x,y) ((*(x))=(y))
+#endif
+
+struct threadsafe_counter
+{
+ long long lcounters[(CPU_CACHE_ALIGMENT/sizeof(long long))*SLOTS_COUNT];
+};
+static __thread long slot_id=-1;
+static __thread pid_t thread_id=0;
+inline int get_lcounter_offset()
+{
+ int offset=0;
+ if(slot_id<0)
+ {
+ thread_id=pthread_self();
+ slot_id=random()%SLOTS_COUNT;
+ }
+ offset=(CPU_CACHE_ALIGMENT/sizeof(long long))*slot_id;
+ return offset;
+}
+inline void threadsafe_counter_add(struct threadsafe_counter* c, long long value)
+{
+ int offset=get_lcounter_offset();
+ atomic_add(&(c->lcounters[offset]), value);
+}
+inline void threadsafe_counter_sub(struct threadsafe_counter* c, long long value)
+{
+ int offset=get_lcounter_offset();
+ atomic_sub(&(c->lcounters[offset]), value);
+}
+
+inline long long threadsafe_counter_read(struct threadsafe_counter* c)
+{
+ int i=0, offset=0;
+ long long value=0;
+ for(i=0; i<SLOTS_COUNT; i++)
+ {
+ offset=i*CPU_CACHE_ALIGMENT/sizeof(long long);
+ value+=atomic_read(&(c->lcounters[offset]));
+ }
+ return value;
+}
+inline void threadsafe_counter_set(struct threadsafe_counter* c, long long value)
+{
+ int i=0, offset=0;
+ for(i=0; i<SLOTS_COUNT; i++)
+ {
+ offset=i*CPU_CACHE_ALIGMENT/sizeof(long long);
+ atomic_set(&(c->lcounters[offset]), 0);
+ }
+ atomic_set(&(c->lcounters[0]), value);
+}
+
diff --git a/test/fs2_test.cpp b/test/fs2_test.cpp
index 610db3a..23db6df 100644
--- a/test/fs2_test.cpp
+++ b/test/fs2_test.cpp
@@ -4,6 +4,7 @@
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
+#include <pthread.h>
#define TEST_STATUS_NUM 4
#define TEST_FIELD_NUM 9
@@ -14,14 +15,87 @@
#define TEST_RUNTIME_REG_NUM 32
#define TEST_RUNTIME_REG_LINE_NUM 6
+int status_ids[TEST_STATUS_NUM], field_ids[TEST_FIELD_NUM], line_ids[TEST_LINE_NUM + TEST_RUNTIME_REG_LINE_NUM], column_ids[TEST_COLUMN_NUM];
+int histogram_ids[TEST_HISTOGRAM_NUM];
+int runtime_status_ids[TEST_RUNTIME_REG_NUM];
+int runtime_reg_num = 0, runtime_reg_line_num = 0;
+
+struct thread_para
+{
+ int loops;
+ screen_stat_handle_t handle;
+ int thread_id;
+};
+static void* worker_thread(void* arg)
+{
+
+ struct thread_para* para=(struct thread_para*)arg;
+ int loops = para->loops, i=0, j=0;
+ screen_stat_handle_t handle=para->handle;
+ char buff[128];
+ int ret=0;
+ while (loops > 0)
+ {
+ loops--;
+ for (i = 0; i < TEST_STATUS_NUM; i++)
+ {
+ FS_operate(handle, status_ids[i], 0, FS_OP_SET, i * 10);
+ }
+ for (i = 0; i < TEST_FIELD_NUM; i++)
+ {
+ FS_operate(handle, field_ids[i], 0, FS_OP_ADD, i * 100);
+ }
+ for (i = 0; i < TEST_LINE_NUM + runtime_reg_line_num; i++)
+ {
+ for (j = 0; j < TEST_COLUMN_NUM; j++)
+ {
+ FS_operate(handle, line_ids[i], column_ids[j], FS_OP_ADD, (j + 1) * 30);
+ }
+ }
+ for (i = 0; i < runtime_reg_num; i++)
+ {
+ FS_operate(handle, runtime_status_ids[i], 0, FS_OP_ADD, i * 1000);
+ }
+ if (runtime_reg_num < TEST_RUNTIME_REG_NUM)
+ {
+ snprintf(buff, sizeof(buff), "rt_reg_%02d", runtime_reg_num);
+ ret = FS_register(handle, FS_STYLE_STATUS, FS_CALC_SPEED, buff);
+ assert(ret >= 0);
+ runtime_status_ids[runtime_reg_num] = ret;
+ runtime_reg_num++;
+ ret = FS_register(handle, FS_STYLE_COLUMN, FS_CALC_SPEED, buff);
+ assert(ret == -1); //always failed
+ }
+ if (runtime_reg_line_num < TEST_RUNTIME_REG_LINE_NUM)
+ {
+ snprintf(buff, sizeof(buff), "line_rt_%02d", runtime_reg_line_num);
+ ret = FS_register(handle, FS_STYLE_LINE, FS_CALC_SPEED, buff);
+ assert(ret >= 0);
+ line_ids[TEST_LINE_NUM + runtime_reg_line_num] = ret;
+ runtime_reg_line_num++;
+ }
+ long long preset[] = {1, 10, 20, 30, 40, 200, 300, 400, 600, 1000, 2000, 4000, 5000, 8000, 100000};
+ for (i = 0; i < TEST_HISTOGRAM_NUM; i++)
+ {
+ for (j = 0; (size_t)j < sizeof(preset) / sizeof(long long); j++)
+ {
+ FS_operate(handle, histogram_ids[i], 0, FS_OP_SET, preset[j]);
+ }
+ }
+ sleep(1);
+ }
+ return NULL;
+}
+
int main(int argc, char *argv[])
{
screen_stat_handle_t handle = NULL;
const char *stat_path = "./fs2_test.status";
const char *app_name = "fs2_test";
char buff[128];
- int value = 0, i = 0, j = 0, runtime_reg_num = 0, runtime_reg_line_num = 0, ret = 0;
- int loops = 10;
+ int value = 0, i = 0;
+
+ srand(171);
unsigned short port = 9001;
FS_library_set_prometheus_port(port);
@@ -63,9 +137,7 @@ int main(int argc, char *argv[])
const char *histogram_format = "0.1,0.5,0.8,0.9,0.95,0.99";
FS_set_para(handle, HISTOGRAM_GLOBAL_BINS, histogram_format, strlen(histogram_format) + 1);
- int status_ids[TEST_STATUS_NUM], field_ids[TEST_FIELD_NUM], line_ids[TEST_LINE_NUM + TEST_RUNTIME_REG_LINE_NUM], column_ids[TEST_COLUMN_NUM];
- int histogram_ids[TEST_HISTOGRAM_NUM];
- int runtime_status_ids[TEST_RUNTIME_REG_NUM];
+
for (i = 0; i < TEST_STATUS_NUM; i++)
{
snprintf(buff, sizeof(buff), "(status_%02d)/\\-,;%%$*", i);
@@ -112,57 +184,22 @@ int main(int argc, char *argv[])
FS_set_para(handle, NOT_SEND_METRIC_TO_SERVER, &value, sizeof(value));
FS_start(handle);
- loops = 10;
- while (loops > 0)
+ int thread_num=16;
+ pthread_t threads[thread_num];
+ struct thread_para para;
+ para.loops=10;
+ para.handle=handle;
+ for(i=0; i<thread_num; i++)
{
- loops--;
- for (i = 0; i < TEST_STATUS_NUM; i++)
- {
- FS_operate(handle, status_ids[i], 0, FS_OP_SET, i * 10);
- }
- for (i = 0; i < TEST_FIELD_NUM; i++)
- {
- FS_operate(handle, field_ids[i], 0, FS_OP_ADD, i * 100);
- }
- for (i = 0; i < TEST_LINE_NUM + runtime_reg_line_num; i++)
- {
- for (j = 0; j < TEST_COLUMN_NUM; j++)
- {
- FS_operate(handle, line_ids[i], column_ids[j], FS_OP_ADD, (j + 1) * 30);
- }
- }
- for (i = 0; i < runtime_reg_num; i++)
- {
- FS_operate(handle, runtime_status_ids[i], 0, FS_OP_ADD, i * 1000);
- }
- if (runtime_reg_num < TEST_RUNTIME_REG_NUM)
- {
- snprintf(buff, sizeof(buff), "rt_reg_%02d", runtime_reg_num);
- ret = FS_register(handle, FS_STYLE_STATUS, FS_CALC_SPEED, buff);
- assert(ret >= 0);
- runtime_status_ids[runtime_reg_num] = ret;
- runtime_reg_num++;
- ret = FS_register(handle, FS_STYLE_COLUMN, FS_CALC_SPEED, buff);
- assert(ret == -1); //always failed
- }
- if (runtime_reg_line_num < TEST_RUNTIME_REG_LINE_NUM)
- {
- snprintf(buff, sizeof(buff), "line_rt_%02d", runtime_reg_line_num);
- ret = FS_register(handle, FS_STYLE_LINE, FS_CALC_SPEED, buff);
- assert(ret >= 0);
- line_ids[TEST_LINE_NUM + runtime_reg_line_num] = ret;
- runtime_reg_line_num++;
- }
- long long preset[] = {1, 10, 20, 30, 40, 200, 300, 400, 600, 1000, 2000, 4000, 5000, 8000, 100000};
- for (i = 0; i < TEST_HISTOGRAM_NUM; i++)
- {
- for (j = 0; (size_t)j < sizeof(preset) / sizeof(long long); j++)
- {
- FS_operate(handle, histogram_ids[i], 0, FS_OP_SET, preset[j]);
- }
- }
- sleep(1);
+ pthread_create(&(threads[i]), NULL, worker_thread, &para);
+ }
+ void *temp;
+ for(i=0; i<thread_num; i++)
+ {
+ pthread_join(threads[i], (void**)&temp);
}
+
+
FS_stop(&handle);
printf("fs2 stoped cnt = %d, will sleep 1s\n", repeat_cnt);
sleep(1);