summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/MESA_field_stat.cpp24
-rw-r--r--src/field_stat_internal.h23
-rw-r--r--src/threadsafe_counter.h76
3 files changed, 92 insertions, 31 deletions
diff --git a/src/MESA_field_stat.cpp b/src/MESA_field_stat.cpp
index c576547..6528a94 100644
--- a/src/MESA_field_stat.cpp
+++ b/src/MESA_field_stat.cpp
@@ -1,5 +1,6 @@
#include "field_stat2.h"
#include "field_stat_internal.h"
+#include "threadsafe_counter.h"
#include "hdr_histogram.h"
#include "cJSON.h"
@@ -789,13 +790,13 @@ int FS_operate(screen_stat_handle_t handle,int id,int column_id,enum field_op op
switch(op)
{
case FS_OP_ADD:
- atomic_add(&(target->changing), value);
+ threadsafe_counter_add(&(target->changing), value);
break;
case FS_OP_SET:
- atomic_set(&(target->changing), value-target->accumulated);
+ threadsafe_counter_set(&(target->changing), value-target->accumulated);
break;
case FS_OP_SUB:
- atomic_sub(&(target->changing), value);
+ threadsafe_counter_sub(&(target->changing), value);
break;
default:
assert(0);
@@ -821,12 +822,12 @@ long long get_stat_unit_val(display_manifest_t* p, int column_seq,enum field_cal
default:
break;
}
- value= atomic_read(&(target->changing));
+ value= threadsafe_counter_read(&(target->changing));
if(is_refer==0)
{
target->previous_changed=value;
target->accumulated+=value;
- atomic_set(&(target->changing), 0);
+ threadsafe_counter_set(&(target->changing), 0);
}
switch(calc_type)
{
@@ -1322,7 +1323,7 @@ static int output_style_histogram(struct FS_space_t* _handle, long long interval
{
pos--;
}
- pos+=snprintf(pos,size-(pos-print_buf),"\n%s\n",draw_line);
+ pos+=snprintf(pos,size-(pos-print_buf),"\n%s\n", draw_line);
}
return pos-print_buf;
}
@@ -1360,7 +1361,8 @@ cJSON *fs2_metrics_to_json(FS_space_t* _handle, long long timestamp)
cJSON_AddStringToObject(tmp_obj, "name", p->name);
cJSON_AddStringToObject(tmp_obj, "type", "single");
cJSON_AddNumberToObject(tmp_obj, "acc", p->single.accumulated);
- cJSON_AddNumberToObject(tmp_obj, "diff", p->single.changing);
+ value=threadsafe_counter_read(&p->single.changing);
+ cJSON_AddNumberToObject(tmp_obj, "diff", value);
cJSON_AddItemToArray(metrics_array_obj, tmp_obj);
//cJSON_Delete(tmp_obj);
break;
@@ -1371,8 +1373,9 @@ cJSON *fs2_metrics_to_json(FS_space_t* _handle, long long timestamp)
tmp_obj = cJSON_CreateObject();
cJSON_AddStringToObject(tmp_obj, "name", p->name);
cJSON_AddStringToObject(tmp_obj, "type", "single");
- cJSON_AddNumberToObject(tmp_obj, "acc", p->single.accumulated);
- cJSON_AddNumberToObject(tmp_obj, "diff", p->single.changing);
+ cJSON_AddNumberToObject(tmp_obj, "acc", p->single.accumulated);
+ value=threadsafe_counter_read(&p->single.changing);
+ cJSON_AddNumberToObject(tmp_obj, "diff", value);
cJSON_AddItemToArray(metrics_array_obj, tmp_obj);
//cJSON_Delete(tmp_obj);
break;
@@ -1390,7 +1393,8 @@ cJSON *fs2_metrics_to_json(FS_space_t* _handle, long long timestamp)
cJSON_AddStringToObject(tmp_obj, "name", tmp_output_name);
//cJSON_AddStringToObject(tmp_obj, "type", "line");
cJSON_AddNumberToObject(tmp_obj, "acc", p_column->single.accumulated);
- cJSON_AddNumberToObject(tmp_obj, "diff", p_column->single.changing);
+ value=threadsafe_counter_read(&p->single.changing);
+ cJSON_AddNumberToObject(tmp_obj, "diff", value);
cJSON_AddItemToArray(metrics_array_obj, tmp_obj);
//cJSON_Delete(tmp_obj);
}
diff --git a/src/field_stat_internal.h b/src/field_stat_internal.h
index b5fdc22..5dbeb2e 100644
--- a/src/field_stat_internal.h
+++ b/src/field_stat_internal.h
@@ -3,26 +3,7 @@
#include <pthread.h>
#include "hdr_histogram.h"
-
-#if(__GNUC__ * 100 + __GNUC_MINOR__ * 10 + __GNUC_PATCHLEVEL__ >= 410)
-#define atomic_inc(x) __sync_add_and_fetch((x),1)
-#define atomic_dec(x) __sync_sub_and_fetch((x),1)
-#define atomic_add(x,y) __sync_add_and_fetch((x),(y))
-#define atomic_sub(x,y) __sync_sub_and_fetch((x),(y))
-typedef long atomic_t;
-#define ATOMIC_INIT(i) { (i) }
-#define atomic_read(x) __sync_add_and_fetch((x),0)
-#define atomic_set(x,y) __sync_lock_test_and_set((x),y)
-#else
-typedef long atomic_t;
-#define atomic_inc(x) ((*(x))++)
-#define atomic_dec(x) ((*(x))--)
-#define atomic_add(x,y) ((*(x))+=(y))
-#define atomic_sub(x,y) ((*(x))-=(y))
-#define ATOMIC_INIT(i) { (i) }
-#define atomic_read(x) (*(x))
-#define atomic_set(x,y) ((*(x))=(y))
-#endif
+#include "threadsafe_counter.h"
#define INIT_STAT_FIELD_NUM 1024
#define MAX_STAT_COLUMN_NUM 64
@@ -42,7 +23,7 @@ typedef long atomic_t;
struct stat_unit_t
{
- long long changing;
+ struct threadsafe_counter changing;
long long accumulated;
long long previous_changed;
};
diff --git a/src/threadsafe_counter.h b/src/threadsafe_counter.h
new file mode 100644
index 0000000..8748764
--- /dev/null
+++ b/src/threadsafe_counter.h
@@ -0,0 +1,76 @@
+#pragma once
+#include <stdlib.h>
+#include <pthread.h>
+#define SLOTS_COUNT 256
+#define CPU_CACHE_ALIGMENT 64
+
+#if(__GNUC__ * 100 + __GNUC_MINOR__ * 10 + __GNUC_PATCHLEVEL__ >= 410)
+#define atomic_inc(x) __sync_add_and_fetch((x),1)
+#define atomic_dec(x) __sync_sub_and_fetch((x),1)
+#define atomic_add(x,y) __sync_add_and_fetch((x),(y))
+#define atomic_sub(x,y) __sync_sub_and_fetch((x),(y))
+typedef long atomic_t;
+#define ATOMIC_INIT(i) { (i) }
+#define atomic_read(x) __sync_add_and_fetch((x),0)
+#define atomic_set(x,y) __sync_lock_test_and_set((x),y)
+#else
+typedef long atomic_t;
+#define atomic_inc(x) ((*(x))++)
+#define atomic_dec(x) ((*(x))--)
+#define atomic_add(x,y) ((*(x))+=(y))
+#define atomic_sub(x,y) ((*(x))-=(y))
+#define ATOMIC_INIT(i) { (i) }
+#define atomic_read(x) (*(x))
+#define atomic_set(x,y) ((*(x))=(y))
+#endif
+
+struct threadsafe_counter
+{
+ long long lcounters[(CPU_CACHE_ALIGMENT/sizeof(long long))*SLOTS_COUNT];
+};
+static __thread long slot_id=-1;
+static __thread pid_t thread_id=0;
+inline int get_lcounter_offset()
+{
+ int offset=0;
+ if(slot_id<0)
+ {
+ thread_id=pthread_self();
+ slot_id=random()%SLOTS_COUNT;
+ }
+ offset=(CPU_CACHE_ALIGMENT/sizeof(long long))*slot_id;
+ return offset;
+}
+inline void threadsafe_counter_add(struct threadsafe_counter* c, long long value)
+{
+ int offset=get_lcounter_offset();
+ atomic_add(&(c->lcounters[offset]), value);
+}
+inline void threadsafe_counter_sub(struct threadsafe_counter* c, long long value)
+{
+ int offset=get_lcounter_offset();
+ atomic_sub(&(c->lcounters[offset]), value);
+}
+
+inline long long threadsafe_counter_read(struct threadsafe_counter* c)
+{
+ int i=0, offset=0;
+ long long value=0;
+ for(i=0; i<SLOTS_COUNT; i++)
+ {
+ offset=i*CPU_CACHE_ALIGMENT/sizeof(long long);
+ value+=atomic_read(&(c->lcounters[offset]));
+ }
+ return value;
+}
+inline void threadsafe_counter_set(struct threadsafe_counter* c, long long value)
+{
+ int i=0, offset=0;
+ for(i=0; i<SLOTS_COUNT; i++)
+ {
+ offset=i*CPU_CACHE_ALIGMENT/sizeof(long long);
+ atomic_set(&(c->lcounters[offset]), 0);
+ }
+ atomic_set(&(c->lcounters[0]), value);
+}
+