summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorfumingwei <[email protected]>2023-03-14 16:34:18 +0800
committerfumingwei <[email protected]>2023-03-14 16:34:18 +0800
commit9dbf155d1f115d588b88ffb5eac86e08e2222c1f (patch)
tree5c3a40277ed10879a101d41593aee9e8e93f0377
parent13638b967b68b4f60a487ce2ec12c5c7b81d9ba6 (diff)
feature:过滤lineprotocol和prometheus输出
-rw-r--r--src/fieldstat.cpp23
-rw-r--r--src/fieldstat_internal.h1
-rw-r--r--src/file_output.cpp52
-rw-r--r--src/line_protocol_output.cpp64
-rw-r--r--src/prometheus_output.cpp26
-rw-r--r--test/fieldstat_test.cpp63
6 files changed, 185 insertions, 44 deletions
diff --git a/src/fieldstat.cpp b/src/fieldstat.cpp
index d1bf882..a50cb03 100644
--- a/src/fieldstat.cpp
+++ b/src/fieldstat.cpp
@@ -49,6 +49,18 @@ int is_valid_field_name(const char* name)
}
+void get_current_table_line_cnt(struct fieldstat_instance *instance, int n_table, int *tables_line_cnt)
+{
+ for(int i = 0; i < n_table; i++)
+ {
+ if(instance->table_metrics[i] == NULL)
+ {
+ continue;
+ }
+ tables_line_cnt[i] = instance->table_metrics[i]->line_cnt;
+ }
+}
+
struct metric_t * get_metric(struct fieldstat_instance *instance, int metric_id)
{
int block_index = 0;
@@ -316,7 +328,6 @@ int fieldstat_register(struct fieldstat_instance *instance, enum field_type type
metric_id = atomic_inc(&instance->metric_cnt) - 1;
metric_slot = read_metric_slot(instance, metric_id);
metric = metric_new(type,field_name,tag_key,tag_value,n_tag);
- *metric_slot = metric;
switch(type)
{
@@ -329,6 +340,7 @@ int fieldstat_register(struct fieldstat_instance *instance, enum field_type type
default:
assert(0);
}
+ *metric_slot = metric;
return metric_id;
}
@@ -347,13 +359,15 @@ long long get_metric_unit_val(struct metric_t *metric,enum field_calc_algo calc_
default:
break;
}
+
value = threadsafe_counter_read(&(target->changing));
//value= threadsafe_counter_read(&(target->changing));
if(is_refer == 0)
{
target->previous_changed = value;
target->accumulated += value;
- threadsafe_counter_set(&(target->changing), 0);
+// threadsafe_counter_set(&(target->changing), 0);
+ threadsafe_counter_sub(&(target->changing), value);
}
switch(calc_type)
{
@@ -564,7 +578,6 @@ struct metric_id_list fieldstat_register_table_metrics(struct fieldstat_instance
line_id = atomic_inc(&(table->line_cnt)) - 1;
line_slot = read_table_line_slot(table,line_id);
table_line = table_line_new(line_name, tag_key, tag_value, n_tag);
- *line_slot = table_line;
for(i = 0; i < table->column_cnt; i++)
{
@@ -580,7 +593,7 @@ struct metric_id_list fieldstat_register_table_metrics(struct fieldstat_instance
ret_metric_id_list.id[ret_metric_id_list.count ++] = metric_id;
}
-
+ *line_slot = table_line;
return ret_metric_id_list;
}
@@ -727,7 +740,6 @@ static int fieldstat_register_histogram_and_summary(struct fieldstat_instance *i
metric_id = atomic_inc(&instance->metric_cnt) - 1;
metric_slot = read_metric_slot(instance, metric_id);
metric = metric_new(type,field_name,tag_key,tag_value,n_tag);
- *metric_slot = metric;
metric->histogram.highest_trackable_value = (int64_t)highest_trackable_value;
metric->histogram.lowest_trackable_value = (int64_t)lowest_trackable_value;
@@ -750,6 +762,7 @@ static int fieldstat_register_histogram_and_summary(struct fieldstat_instance *i
default:
break;
}
+ *metric_slot = metric;
return metric_id;
}
diff --git a/src/fieldstat_internal.h b/src/fieldstat_internal.h
index e617787..5883a24 100644
--- a/src/fieldstat_internal.h
+++ b/src/fieldstat_internal.h
@@ -229,3 +229,4 @@ int fieldstat_output_file(struct fieldstat_instance *instance,long long interval
struct table_line * read_table_line(struct table_metric *table, int line_id);
int send_udp(int sd, unsigned int dest_ip, unsigned short dest_port, const char * data, int len);
int line_protocol_output(struct fieldstat_instance *instance);
+void get_current_table_line_cnt(struct fieldstat_instance *instance, int n_table, int *table_line_cnt); \ No newline at end of file
diff --git a/src/file_output.cpp b/src/file_output.cpp
index f7aa863..dceb370 100644
--- a/src/file_output.cpp
+++ b/src/file_output.cpp
@@ -41,7 +41,7 @@ static int print_buf_tag_append_position(char *tag_key[], char *tag_value[], siz
return append_pos - print_buf_tags;
}
-static int output_file_format_default_type_gauge(struct fieldstat_instance *instance,long long interval_ms,char *print_buf, unsigned int size)
+static int output_file_format_default_type_gauge(struct fieldstat_instance *instance,int current_metric_cnt,long long interval_ms,char *print_buf, unsigned int size)
{
int i = 0, j = 0;
//display_manifest_t* p = NULL;
@@ -52,10 +52,14 @@ static int output_file_format_default_type_gauge(struct fieldstat_instance *inst
char *append_pos = print_buf;
char print_buf_tags[1024];
- for(i = 0; i < instance->metric_cnt; i++)
+ for(i = 0; i < current_metric_cnt; i++)
{
//metric = instance->metric[i];
metric = get_metric(instance, i);
+ if(metric == NULL)
+ {
+ continue;
+ }
if(metric->field_type != FIELD_TYPE_GAUGE)
{
continue;
@@ -108,7 +112,7 @@ static int output_file_format_default_type_gauge(struct fieldstat_instance *inst
}
-static int output_file_format_default_type_counter(struct fieldstat_instance *instance,long long interval_ms,char*print_buf, unsigned int size)
+static int output_file_format_default_type_counter(struct fieldstat_instance *instance, int current_metric_cnt, long long interval_ms,char*print_buf, unsigned int size)
{
int i=0,j=0;
//display_manifest_t* p=NULL;
@@ -120,11 +124,15 @@ static int output_file_format_default_type_counter(struct fieldstat_instance *in
int metric_cnt = 0;
char print_buf_tags[1024];
- for(i = 0;i < instance->metric_cnt; i++)
+ for(i = 0;i < current_metric_cnt; i++)
{
//p=_handle->display[i];
//metric = instance->metric[i];
metric = get_metric(instance, i);
+ if(metric == NULL)
+ {
+ continue;
+ }
if(metric->field_type != FIELD_TYPE_COUNTER)
{
continue;
@@ -211,7 +219,7 @@ static int output_file_format_default_type_counter(struct fieldstat_instance *in
return append_pos - print_buf;
}
-static int output_file_format_default_table(struct fieldstat_instance *instance,long long interval_ms,char*print_buf, unsigned int size)
+static int output_file_format_default_table(struct fieldstat_instance *instance,int tables_line_cnt[], int current_table_cnt,long long interval_ms,char*print_buf, unsigned int size)
{
int i = 0, j = 0, k = 0;
struct table_metric *table = NULL;
@@ -221,7 +229,7 @@ static int output_file_format_default_table(struct fieldstat_instance *instance,
long long value = 0;
char* append_pos = print_buf;
- for(i = 0; i < instance->table_num; i++) //per table
+ for(i = 0; i < current_table_cnt; i++) //per table
{
table = instance->table_metrics[i];
append_pos += snprintf(append_pos, size - (append_pos - print_buf),"%-20s\t\t",table->name);
@@ -234,9 +242,13 @@ static int output_file_format_default_table(struct fieldstat_instance *instance,
table->column_name[j]
);
}
- for(j = 0; j < table->line_cnt; j++) //per line
+ for(j = 0; j < tables_line_cnt[i]; j++) //per line
{
line = read_table_line(table,j);
+ if(line == NULL)
+ {
+ continue;
+ }
//print table line name + tag
append_pos += snprintf(append_pos,
size - (append_pos - print_buf),
@@ -385,14 +397,14 @@ static int output_file_print_hdr_unit(struct metric_t *metric, char*print_buf, s
return pos-print_buf;
}
-static int output_file_format_default_type_histogram_and_summary(struct fieldstat_instance *instance, long long interval_ms, char*print_buf, size_t size)
+static int output_file_format_default_type_histogram_and_summary(struct fieldstat_instance *instance, int current_metric_cnt,long long interval_ms, char*print_buf, size_t size)
{
int i = 0, j = 0, metric_num = 0;
char *pos = print_buf;
//display_manifest_t* p=NULL;
struct metric_t *metric = NULL;
- struct metric_t *metric_array[INIT_STAT_FIELD_NUM] = {NULL};
- int metric_is_print[INIT_STAT_FIELD_NUM] = {0};
+ struct metric_t *metric_array[current_metric_cnt] = {NULL};
+ int metric_is_print[current_metric_cnt] = {0};
if(instance->histogram_cnt == 0
&& instance->summary_cnt == 0)
@@ -400,9 +412,13 @@ static int output_file_format_default_type_histogram_and_summary(struct fieldsta
return 0;
}
- for(i = 0; i < instance->metric_cnt; i ++)
+ for(i = 0; i < current_metric_cnt; i ++)
{
metric = get_metric(instance, i);
+ if(metric == NULL)
+ {
+ continue;
+ }
if(metric->field_type != FIELD_TYPE_SUMMARY
&& metric->field_type != FILED_TYPE_HISTOGRAM)
{
@@ -450,7 +466,11 @@ static int output_file_format_default_type_histogram_and_summary(struct fieldsta
int fieldstat_output_file(struct fieldstat_instance *instance,long long interval_ms)
{
- size_t print_buf_sz = instance->metric_cnt*1024;
+ int tables_line_cnt[TABLE_MAX_NUM];
+ int current_table_cnt = instance->table_num;
+ get_current_table_line_cnt(instance, current_table_cnt, tables_line_cnt);
+ int current_metric_cnt = instance->metric_cnt;
+ size_t print_buf_sz = current_metric_cnt*1024;
char *print_buf = NULL;
char *append_pos = NULL;
time_t current = 0;
@@ -478,10 +498,10 @@ int fieldstat_output_file(struct fieldstat_instance *instance,long long interval
append_pos += snprintf(append_pos, print_buf_sz - (append_pos - print_buf),"%s\n",draw_boundary);
//pthread_mutex_lock(&(_handle->reg_lock)); //TODO
- append_pos += output_file_format_default_type_gauge(instance, interval_ms, append_pos, print_buf_sz - (append_pos - print_buf));
- append_pos += output_file_format_default_type_counter(instance, interval_ms, append_pos, print_buf_sz - (append_pos - print_buf));
- append_pos += output_file_format_default_table(instance, interval_ms, append_pos, print_buf_sz - (append_pos - print_buf));
- append_pos += output_file_format_default_type_histogram_and_summary(instance, interval_ms, append_pos, print_buf_sz - (append_pos - print_buf));
+ append_pos += output_file_format_default_type_gauge(instance, current_metric_cnt, interval_ms, append_pos, print_buf_sz - (append_pos - print_buf));
+ append_pos += output_file_format_default_type_counter(instance, current_metric_cnt, interval_ms, append_pos, print_buf_sz - (append_pos - print_buf));
+ append_pos += output_file_format_default_table(instance, tables_line_cnt, current_table_cnt, interval_ms, append_pos, print_buf_sz - (append_pos - print_buf));
+ append_pos += output_file_format_default_type_histogram_and_summary(instance,current_metric_cnt,interval_ms, append_pos, print_buf_sz - (append_pos - print_buf));
//TODO output table,output histogram,output summary
//pthread_mutex_unlock(&(_handle->reg_lock));//TODO
}
diff --git a/src/line_protocol_output.cpp b/src/line_protocol_output.cpp
index 5d820c0..f8b3108 100644
--- a/src/line_protocol_output.cpp
+++ b/src/line_protocol_output.cpp
@@ -56,7 +56,7 @@ static int output_line_protocol_tag_set_buf(char *tag_key[], char *tag_value[],
}
-static void output_line_protocol_table(struct fieldstat_instance *instance)
+static void output_line_protocol_table(struct fieldstat_instance *instance,int tables_line_cnt[], int current_table_cnt)
{
int i = 0, j = 0, k = 0;
metric_t *metric = NULL;
@@ -72,23 +72,21 @@ static void output_line_protocol_table(struct fieldstat_instance *instance)
struct table_metric *table = NULL;
struct table_line *line = NULL;
+ int line_value_is_not_zero = 0;
- for(i = 0; i < instance->table_num; i++)
+ for(i = 0; i < current_table_cnt; i++)
{
table = instance->table_metrics[i];
- for(j = 0; j < table->line_cnt; j++)
+ for(j = 0; j < tables_line_cnt[i]; j++)
{
line = read_table_line(table, j);
- tag_pos += snprintf(tag_pos,
- sizeof(tag_set_buff) - (tag_pos - tag_set_buff),
- ",app_name=%s,table_name=%s",
- instance->name,
- table->name
- );
-
- tag_pos += output_line_protocol_tag_set_buf(line->tag_key, line->tag_value, line->n_tag, tag_pos,
- sizeof(tag_set_buff) - (tag_pos - tag_set_buff));
-
+ if(line == NULL)
+ {
+ continue;
+ }
+ line_value_is_not_zero = 0;
+ tag_pos = tag_set_buff;
+ field_pos = field_set_buff;
for(k = 0; k < table->column_cnt; k ++)
{
metric = get_metric(instance, line->metric_id_belong_to_line[k]);
@@ -96,7 +94,10 @@ static void output_line_protocol_table(struct fieldstat_instance *instance)
value = metric->field_type == FIELD_TYPE_GAUGE ?
get_metric_unit_val(metric, FS_CALC_CURRENT, 1):
get_metric_unit_val(metric, FS_CALC_SPEED, 1);
-
+ if(value != 0)
+ {
+ line_value_is_not_zero = 1;
+ }
field_pos += snprintf(field_pos,
sizeof(field_set_buff) - (field_set_buff - field_set_buff),
"%s=%lld,",
@@ -105,15 +106,28 @@ static void output_line_protocol_table(struct fieldstat_instance *instance)
);
}
+ if(line_value_is_not_zero == 0)
+ {
+ continue;
+ }
+
+ tag_pos += snprintf(tag_pos,
+ sizeof(tag_set_buff) - (tag_pos - tag_set_buff),
+ ",app_name=%s,table_name=%s",
+ instance->name,
+ table->name
+ );
+
+ tag_pos += output_line_protocol_tag_set_buf(line->tag_key, line->tag_value, line->n_tag, tag_pos,
+ sizeof(tag_set_buff) - (tag_pos - tag_set_buff));
+
+
if(field_pos - field_set_buff > 0)
{
*(field_pos - 1) = '\0';
}
// measurement,tag_set field_set
append_line_protocol_line(instance, metric->field_name, tag_set_buff, field_set_buff);
- tag_pos = tag_set_buff;
- field_pos = field_set_buff;
-
}
}
@@ -130,10 +144,20 @@ int line_protocol_output(struct fieldstat_instance *instance)
memset(field_set_buff, 0, sizeof(field_set_buff));
memset(tag_set_buff, 0, sizeof(tag_set_buff));
char *tag_pos = tag_set_buff;
-
- for(i = 0; i < instance->metric_cnt; i++)
+ //print current time instance start
+ int tables_line_cnt[TABLE_MAX_NUM];
+ int current_table_cnt = instance->table_num;
+ get_current_table_line_cnt(instance, current_table_cnt, tables_line_cnt);
+ int current_metric_cnt = instance->metric_cnt;
+ //print current time instance end
+
+ for(i = 0; i < current_metric_cnt; i++)
{
metric = get_metric(instance, i);
+ if(metric == NULL)
+ {
+ continue;
+ }
if(metric->is_ratio == 1)
{
continue;
@@ -183,7 +207,7 @@ int line_protocol_output(struct fieldstat_instance *instance)
}
}
- output_line_protocol_table(instance);
+ output_line_protocol_table(instance,tables_line_cnt,current_table_cnt);
flush_line_protocol_metric(instance);
return 0;
diff --git a/src/prometheus_output.cpp b/src/prometheus_output.cpp
index c3ee470..1d4fbfd 100644
--- a/src/prometheus_output.cpp
+++ b/src/prometheus_output.cpp
@@ -241,6 +241,20 @@ static int prometheus_output_histogram_and_summary(struct metric_t *metric, char
return used_len;
}
+static int is_output_prometheus(struct metric_t *metric)
+{
+ int i = 0, ret = 0;
+ for(i = 0; i < (int)metric->n_tag; i++)
+ {
+ if(strcmp(metric->tag_key[i], "disable_output_prometheus") == 0
+ && strcmp(metric->tag_value[i], "yes") == 0)
+ {
+ ret = 1;
+ break;
+ }
+ }
+ return ret;
+}
static int prometheus_get_instance_metric_playload(struct fieldstat_instance *instance, char **payload, int *payload_size, int offset)
@@ -254,6 +268,7 @@ static int prometheus_get_instance_metric_playload(struct fieldstat_instance *in
int append_offset = offset;
char *new_payload = NULL;
int new_payload_size = 0;
+ int current_metric_cnt = instance->metric_cnt;
if(instance->running != 1)
{
@@ -270,10 +285,17 @@ static int prometheus_get_instance_metric_playload(struct fieldstat_instance *in
str_unescape(instance->name, instance_name, sizeof(instance_name));
- for(i = 0; i < instance->metric_cnt; i++)
+ for(i = 0; i < current_metric_cnt; i++)
{
metric = get_metric(instance, i);
-
+ if(metric == NULL)
+ {
+ continue;
+ }
+ if(is_output_prometheus(metric) == 1)
+ {
+ continue;
+ }
if(metric->is_ratio == 1)
{
continue;
diff --git a/test/fieldstat_test.cpp b/test/fieldstat_test.cpp
index 34e16fe..4765298 100644
--- a/test/fieldstat_test.cpp
+++ b/test/fieldstat_test.cpp
@@ -28,6 +28,8 @@ struct thread_para
int loops;
struct fieldstat_instance * instance;
int thread_id;
+ int shaping_table_id;
+ int sce_table_id;
};
static void* worker_thread(void* arg)
@@ -66,6 +68,52 @@ static void* worker_thread(void* arg)
return NULL;
}
+static void* dynamic_register(void* arg)
+{
+ char name[32] = {0};
+ struct thread_para* para=(struct thread_para*)arg;
+ int loops = para->loops;
+ struct fieldstat_instance *instance = para->instance;
+ int shaping_table_id = para->shaping_table_id;
+ int sce_table_id = para->sce_table_id;
+ const char * bins_htr = "10,20,30,40,50,60,70,80,90";
+ const char * bins_sar = "0.1,0.5,0.8,0.9,0.95,0.99";
+ const char * tags_key[] = {"disable_output_prometheus"};
+ const char * tags_value[] = {"yes"};
+
+ while (loops > 0)
+ {
+ loops--;
+ //fieldstat_value_incrby(instance,g_counter_id[i], i + 1);
+ snprintf(name, sizeof(name), "counter_%d_%d", loops, rand()%10000);
+ fieldstat_register(instance,FIELD_TYPE_COUNTER,name,tags_key,tags_value,sizeof(tags_key)/sizeof(tags_key[0]));
+
+ memset(name, 0, sizeof(name));
+ snprintf(name, sizeof(name), "gauge_%d_%d", loops, rand()%10000);
+ fieldstat_register(instance,FIELD_TYPE_GAUGE, name, tags_key,tags_value,sizeof(tags_key)/sizeof(tags_key[0]));
+
+ memset(name, 0, sizeof(name));
+ snprintf(name, sizeof(name), "summary_%d_%d", loops, rand()%10000);
+ fieldstat_register_histogram(instance,name,tags_key,tags_value,sizeof(tags_key)/sizeof(tags_key[0]),bins_htr,1,10000,2);
+
+ memset(name, 0, sizeof(name));
+ snprintf(name, sizeof(name), "histogram_%d_%d", loops, rand()%10000);
+ fieldstat_register_summary(instance,name,tags_key,tags_value,sizeof(tags_key)/sizeof(tags_key[0]),bins_sar,1,10000,2);
+
+ memset(name, 0, sizeof(name));
+ snprintf(name, sizeof(name), "shp_%d_%d", loops, rand()%10000);
+ fieldstat_register_table_metrics(instance,shaping_table_id,name,tags_key,tags_value,sizeof(tags_key)/sizeof(tags_key[0]));
+
+ memset(name, 0, sizeof(name));
+ snprintf(name, sizeof(name), "sce_%d_%d", loops, rand()%10000);
+ fieldstat_register_table_metrics(instance,sce_table_id,name,tags_key,tags_value,sizeof(tags_key)/sizeof(tags_key[0]));
+
+ sleep(1);
+ }
+ return NULL;
+}
+
+
int set_instance_parameter(struct fieldstat_instance *instance)
{
@@ -264,19 +312,32 @@ int main(int argc, char *argv[])
int thread_num=100;
pthread_t threads[thread_num];
+
+ pthread_t threads_reg[thread_num];
+
struct thread_para para;
+ const char *shaping_column[] = {"in_rx_pkts","in_tx_pkts"};
+ enum field_type shaping_type[] = {FIELD_TYPE_COUNTER,FIELD_TYPE_GAUGE};
+ para.shaping_table_id = fieldstat_register_table(test_instance,"shaping",shaping_column,shaping_type,sizeof(shaping_column)/sizeof(shaping_column[0]));
+ const char *sce_column[] = {"sent_bytes","recv_pkts"};
+ enum field_type sce_type[] = {FIELD_TYPE_COUNTER,FIELD_TYPE_COUNTER};
+ para.sce_table_id = fieldstat_register_table(test_instance, "sce", sce_column,sce_type, sizeof(sce_column)/sizeof(sce_column[0]));
para.loops=10;
para.instance= test_instance;
for(int i=0; i<thread_num; i++)
{
pthread_create(&(threads[i]), NULL, worker_thread, &para);
+ pthread_create(&(threads_reg[i]), NULL, dynamic_register, &para);
}
void *temp;
for(int i=0; i<thread_num; i++)
{
pthread_join(threads[i], (void**)&temp);
+ pthread_join(threads_reg[i], (void**)&temp);
}
- //fieldstat_passive_output(test_instance);
+ fieldstat_passive_output(test_instance);
+ sleep(1);
+ fieldstat_passive_output(test_instance);
sleep(1000);
return 0;