summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorfumingwei <[email protected]>2023-03-14 16:34:18 +0800
committerfumingwei <[email protected]>2023-03-14 16:34:18 +0800
commit9dbf155d1f115d588b88ffb5eac86e08e2222c1f (patch)
tree5c3a40277ed10879a101d41593aee9e8e93f0377 /src
parent13638b967b68b4f60a487ce2ec12c5c7b81d9ba6 (diff)
feature:过滤lineprotocol和prometheus输出
Diffstat (limited to 'src')
-rw-r--r--src/fieldstat.cpp23
-rw-r--r--src/fieldstat_internal.h1
-rw-r--r--src/file_output.cpp52
-rw-r--r--src/line_protocol_output.cpp64
-rw-r--r--src/prometheus_output.cpp26
5 files changed, 123 insertions, 43 deletions
diff --git a/src/fieldstat.cpp b/src/fieldstat.cpp
index d1bf882..a50cb03 100644
--- a/src/fieldstat.cpp
+++ b/src/fieldstat.cpp
@@ -49,6 +49,18 @@ int is_valid_field_name(const char* name)
}
+void get_current_table_line_cnt(struct fieldstat_instance *instance, int n_table, int *tables_line_cnt)
+{
+ for(int i = 0; i < n_table; i++)
+ {
+ if(instance->table_metrics[i] == NULL)
+ {
+ continue;
+ }
+ tables_line_cnt[i] = instance->table_metrics[i]->line_cnt;
+ }
+}
+
struct metric_t * get_metric(struct fieldstat_instance *instance, int metric_id)
{
int block_index = 0;
@@ -316,7 +328,6 @@ int fieldstat_register(struct fieldstat_instance *instance, enum field_type type
metric_id = atomic_inc(&instance->metric_cnt) - 1;
metric_slot = read_metric_slot(instance, metric_id);
metric = metric_new(type,field_name,tag_key,tag_value,n_tag);
- *metric_slot = metric;
switch(type)
{
@@ -329,6 +340,7 @@ int fieldstat_register(struct fieldstat_instance *instance, enum field_type type
default:
assert(0);
}
+ *metric_slot = metric;
return metric_id;
}
@@ -347,13 +359,15 @@ long long get_metric_unit_val(struct metric_t *metric,enum field_calc_algo calc_
default:
break;
}
+
value = threadsafe_counter_read(&(target->changing));
//value= threadsafe_counter_read(&(target->changing));
if(is_refer == 0)
{
target->previous_changed = value;
target->accumulated += value;
- threadsafe_counter_set(&(target->changing), 0);
+// threadsafe_counter_set(&(target->changing), 0);
+ threadsafe_counter_sub(&(target->changing), value);
}
switch(calc_type)
{
@@ -564,7 +578,6 @@ struct metric_id_list fieldstat_register_table_metrics(struct fieldstat_instance
line_id = atomic_inc(&(table->line_cnt)) - 1;
line_slot = read_table_line_slot(table,line_id);
table_line = table_line_new(line_name, tag_key, tag_value, n_tag);
- *line_slot = table_line;
for(i = 0; i < table->column_cnt; i++)
{
@@ -580,7 +593,7 @@ struct metric_id_list fieldstat_register_table_metrics(struct fieldstat_instance
ret_metric_id_list.id[ret_metric_id_list.count ++] = metric_id;
}
-
+ *line_slot = table_line;
return ret_metric_id_list;
}
@@ -727,7 +740,6 @@ static int fieldstat_register_histogram_and_summary(struct fieldstat_instance *i
metric_id = atomic_inc(&instance->metric_cnt) - 1;
metric_slot = read_metric_slot(instance, metric_id);
metric = metric_new(type,field_name,tag_key,tag_value,n_tag);
- *metric_slot = metric;
metric->histogram.highest_trackable_value = (int64_t)highest_trackable_value;
metric->histogram.lowest_trackable_value = (int64_t)lowest_trackable_value;
@@ -750,6 +762,7 @@ static int fieldstat_register_histogram_and_summary(struct fieldstat_instance *i
default:
break;
}
+ *metric_slot = metric;
return metric_id;
}
diff --git a/src/fieldstat_internal.h b/src/fieldstat_internal.h
index e617787..5883a24 100644
--- a/src/fieldstat_internal.h
+++ b/src/fieldstat_internal.h
@@ -229,3 +229,4 @@ int fieldstat_output_file(struct fieldstat_instance *instance,long long interval
struct table_line * read_table_line(struct table_metric *table, int line_id);
int send_udp(int sd, unsigned int dest_ip, unsigned short dest_port, const char * data, int len);
int line_protocol_output(struct fieldstat_instance *instance);
+void get_current_table_line_cnt(struct fieldstat_instance *instance, int n_table, int *table_line_cnt); \ No newline at end of file
diff --git a/src/file_output.cpp b/src/file_output.cpp
index f7aa863..dceb370 100644
--- a/src/file_output.cpp
+++ b/src/file_output.cpp
@@ -41,7 +41,7 @@ static int print_buf_tag_append_position(char *tag_key[], char *tag_value[], siz
return append_pos - print_buf_tags;
}
-static int output_file_format_default_type_gauge(struct fieldstat_instance *instance,long long interval_ms,char *print_buf, unsigned int size)
+static int output_file_format_default_type_gauge(struct fieldstat_instance *instance,int current_metric_cnt,long long interval_ms,char *print_buf, unsigned int size)
{
int i = 0, j = 0;
//display_manifest_t* p = NULL;
@@ -52,10 +52,14 @@ static int output_file_format_default_type_gauge(struct fieldstat_instance *inst
char *append_pos = print_buf;
char print_buf_tags[1024];
- for(i = 0; i < instance->metric_cnt; i++)
+ for(i = 0; i < current_metric_cnt; i++)
{
//metric = instance->metric[i];
metric = get_metric(instance, i);
+ if(metric == NULL)
+ {
+ continue;
+ }
if(metric->field_type != FIELD_TYPE_GAUGE)
{
continue;
@@ -108,7 +112,7 @@ static int output_file_format_default_type_gauge(struct fieldstat_instance *inst
}
-static int output_file_format_default_type_counter(struct fieldstat_instance *instance,long long interval_ms,char*print_buf, unsigned int size)
+static int output_file_format_default_type_counter(struct fieldstat_instance *instance, int current_metric_cnt, long long interval_ms,char*print_buf, unsigned int size)
{
int i=0,j=0;
//display_manifest_t* p=NULL;
@@ -120,11 +124,15 @@ static int output_file_format_default_type_counter(struct fieldstat_instance *in
int metric_cnt = 0;
char print_buf_tags[1024];
- for(i = 0;i < instance->metric_cnt; i++)
+ for(i = 0;i < current_metric_cnt; i++)
{
//p=_handle->display[i];
//metric = instance->metric[i];
metric = get_metric(instance, i);
+ if(metric == NULL)
+ {
+ continue;
+ }
if(metric->field_type != FIELD_TYPE_COUNTER)
{
continue;
@@ -211,7 +219,7 @@ static int output_file_format_default_type_counter(struct fieldstat_instance *in
return append_pos - print_buf;
}
-static int output_file_format_default_table(struct fieldstat_instance *instance,long long interval_ms,char*print_buf, unsigned int size)
+static int output_file_format_default_table(struct fieldstat_instance *instance,int tables_line_cnt[], int current_table_cnt,long long interval_ms,char*print_buf, unsigned int size)
{
int i = 0, j = 0, k = 0;
struct table_metric *table = NULL;
@@ -221,7 +229,7 @@ static int output_file_format_default_table(struct fieldstat_instance *instance,
long long value = 0;
char* append_pos = print_buf;
- for(i = 0; i < instance->table_num; i++) //per table
+ for(i = 0; i < current_table_cnt; i++) //per table
{
table = instance->table_metrics[i];
append_pos += snprintf(append_pos, size - (append_pos - print_buf),"%-20s\t\t",table->name);
@@ -234,9 +242,13 @@ static int output_file_format_default_table(struct fieldstat_instance *instance,
table->column_name[j]
);
}
- for(j = 0; j < table->line_cnt; j++) //per line
+ for(j = 0; j < tables_line_cnt[i]; j++) //per line
{
line = read_table_line(table,j);
+ if(line == NULL)
+ {
+ continue;
+ }
//print table line name + tag
append_pos += snprintf(append_pos,
size - (append_pos - print_buf),
@@ -385,14 +397,14 @@ static int output_file_print_hdr_unit(struct metric_t *metric, char*print_buf, s
return pos-print_buf;
}
-static int output_file_format_default_type_histogram_and_summary(struct fieldstat_instance *instance, long long interval_ms, char*print_buf, size_t size)
+static int output_file_format_default_type_histogram_and_summary(struct fieldstat_instance *instance, int current_metric_cnt,long long interval_ms, char*print_buf, size_t size)
{
int i = 0, j = 0, metric_num = 0;
char *pos = print_buf;
//display_manifest_t* p=NULL;
struct metric_t *metric = NULL;
- struct metric_t *metric_array[INIT_STAT_FIELD_NUM] = {NULL};
- int metric_is_print[INIT_STAT_FIELD_NUM] = {0};
+ struct metric_t *metric_array[current_metric_cnt] = {NULL};
+ int metric_is_print[current_metric_cnt] = {0};
if(instance->histogram_cnt == 0
&& instance->summary_cnt == 0)
@@ -400,9 +412,13 @@ static int output_file_format_default_type_histogram_and_summary(struct fieldsta
return 0;
}
- for(i = 0; i < instance->metric_cnt; i ++)
+ for(i = 0; i < current_metric_cnt; i ++)
{
metric = get_metric(instance, i);
+ if(metric == NULL)
+ {
+ continue;
+ }
if(metric->field_type != FIELD_TYPE_SUMMARY
&& metric->field_type != FILED_TYPE_HISTOGRAM)
{
@@ -450,7 +466,11 @@ static int output_file_format_default_type_histogram_and_summary(struct fieldsta
int fieldstat_output_file(struct fieldstat_instance *instance,long long interval_ms)
{
- size_t print_buf_sz = instance->metric_cnt*1024;
+ int tables_line_cnt[TABLE_MAX_NUM];
+ int current_table_cnt = instance->table_num;
+ get_current_table_line_cnt(instance, current_table_cnt, tables_line_cnt);
+ int current_metric_cnt = instance->metric_cnt;
+ size_t print_buf_sz = current_metric_cnt*1024;
char *print_buf = NULL;
char *append_pos = NULL;
time_t current = 0;
@@ -478,10 +498,10 @@ int fieldstat_output_file(struct fieldstat_instance *instance,long long interval
append_pos += snprintf(append_pos, print_buf_sz - (append_pos - print_buf),"%s\n",draw_boundary);
//pthread_mutex_lock(&(_handle->reg_lock)); //TODO
- append_pos += output_file_format_default_type_gauge(instance, interval_ms, append_pos, print_buf_sz - (append_pos - print_buf));
- append_pos += output_file_format_default_type_counter(instance, interval_ms, append_pos, print_buf_sz - (append_pos - print_buf));
- append_pos += output_file_format_default_table(instance, interval_ms, append_pos, print_buf_sz - (append_pos - print_buf));
- append_pos += output_file_format_default_type_histogram_and_summary(instance, interval_ms, append_pos, print_buf_sz - (append_pos - print_buf));
+ append_pos += output_file_format_default_type_gauge(instance, current_metric_cnt, interval_ms, append_pos, print_buf_sz - (append_pos - print_buf));
+ append_pos += output_file_format_default_type_counter(instance, current_metric_cnt, interval_ms, append_pos, print_buf_sz - (append_pos - print_buf));
+ append_pos += output_file_format_default_table(instance, tables_line_cnt, current_table_cnt, interval_ms, append_pos, print_buf_sz - (append_pos - print_buf));
+ append_pos += output_file_format_default_type_histogram_and_summary(instance,current_metric_cnt,interval_ms, append_pos, print_buf_sz - (append_pos - print_buf));
//TODO output table,output histogram,output summary
//pthread_mutex_unlock(&(_handle->reg_lock));//TODO
}
diff --git a/src/line_protocol_output.cpp b/src/line_protocol_output.cpp
index 5d820c0..f8b3108 100644
--- a/src/line_protocol_output.cpp
+++ b/src/line_protocol_output.cpp
@@ -56,7 +56,7 @@ static int output_line_protocol_tag_set_buf(char *tag_key[], char *tag_value[],
}
-static void output_line_protocol_table(struct fieldstat_instance *instance)
+static void output_line_protocol_table(struct fieldstat_instance *instance,int tables_line_cnt[], int current_table_cnt)
{
int i = 0, j = 0, k = 0;
metric_t *metric = NULL;
@@ -72,23 +72,21 @@ static void output_line_protocol_table(struct fieldstat_instance *instance)
struct table_metric *table = NULL;
struct table_line *line = NULL;
+ int line_value_is_not_zero = 0;
- for(i = 0; i < instance->table_num; i++)
+ for(i = 0; i < current_table_cnt; i++)
{
table = instance->table_metrics[i];
- for(j = 0; j < table->line_cnt; j++)
+ for(j = 0; j < tables_line_cnt[i]; j++)
{
line = read_table_line(table, j);
- tag_pos += snprintf(tag_pos,
- sizeof(tag_set_buff) - (tag_pos - tag_set_buff),
- ",app_name=%s,table_name=%s",
- instance->name,
- table->name
- );
-
- tag_pos += output_line_protocol_tag_set_buf(line->tag_key, line->tag_value, line->n_tag, tag_pos,
- sizeof(tag_set_buff) - (tag_pos - tag_set_buff));
-
+ if(line == NULL)
+ {
+ continue;
+ }
+ line_value_is_not_zero = 0;
+ tag_pos = tag_set_buff;
+ field_pos = field_set_buff;
for(k = 0; k < table->column_cnt; k ++)
{
metric = get_metric(instance, line->metric_id_belong_to_line[k]);
@@ -96,7 +94,10 @@ static void output_line_protocol_table(struct fieldstat_instance *instance)
value = metric->field_type == FIELD_TYPE_GAUGE ?
get_metric_unit_val(metric, FS_CALC_CURRENT, 1):
get_metric_unit_val(metric, FS_CALC_SPEED, 1);
-
+ if(value != 0)
+ {
+ line_value_is_not_zero = 1;
+ }
field_pos += snprintf(field_pos,
sizeof(field_set_buff) - (field_set_buff - field_set_buff),
"%s=%lld,",
@@ -105,15 +106,28 @@ static void output_line_protocol_table(struct fieldstat_instance *instance)
);
}
+ if(line_value_is_not_zero == 0)
+ {
+ continue;
+ }
+
+ tag_pos += snprintf(tag_pos,
+ sizeof(tag_set_buff) - (tag_pos - tag_set_buff),
+ ",app_name=%s,table_name=%s",
+ instance->name,
+ table->name
+ );
+
+ tag_pos += output_line_protocol_tag_set_buf(line->tag_key, line->tag_value, line->n_tag, tag_pos,
+ sizeof(tag_set_buff) - (tag_pos - tag_set_buff));
+
+
if(field_pos - field_set_buff > 0)
{
*(field_pos - 1) = '\0';
}
// measurement,tag_set field_set
append_line_protocol_line(instance, metric->field_name, tag_set_buff, field_set_buff);
- tag_pos = tag_set_buff;
- field_pos = field_set_buff;
-
}
}
@@ -130,10 +144,20 @@ int line_protocol_output(struct fieldstat_instance *instance)
memset(field_set_buff, 0, sizeof(field_set_buff));
memset(tag_set_buff, 0, sizeof(tag_set_buff));
char *tag_pos = tag_set_buff;
-
- for(i = 0; i < instance->metric_cnt; i++)
+ //print current time instance start
+ int tables_line_cnt[TABLE_MAX_NUM];
+ int current_table_cnt = instance->table_num;
+ get_current_table_line_cnt(instance, current_table_cnt, tables_line_cnt);
+ int current_metric_cnt = instance->metric_cnt;
+ //print current time instance end
+
+ for(i = 0; i < current_metric_cnt; i++)
{
metric = get_metric(instance, i);
+ if(metric == NULL)
+ {
+ continue;
+ }
if(metric->is_ratio == 1)
{
continue;
@@ -183,7 +207,7 @@ int line_protocol_output(struct fieldstat_instance *instance)
}
}
- output_line_protocol_table(instance);
+ output_line_protocol_table(instance,tables_line_cnt,current_table_cnt);
flush_line_protocol_metric(instance);
return 0;
diff --git a/src/prometheus_output.cpp b/src/prometheus_output.cpp
index c3ee470..1d4fbfd 100644
--- a/src/prometheus_output.cpp
+++ b/src/prometheus_output.cpp
@@ -241,6 +241,20 @@ static int prometheus_output_histogram_and_summary(struct metric_t *metric, char
return used_len;
}
+static int is_output_prometheus(struct metric_t *metric)
+{
+ int i = 0, ret = 0;
+ for(i = 0; i < (int)metric->n_tag; i++)
+ {
+ if(strcmp(metric->tag_key[i], "disable_output_prometheus") == 0
+ && strcmp(metric->tag_value[i], "yes") == 0)
+ {
+ ret = 1;
+ break;
+ }
+ }
+ return ret;
+}
static int prometheus_get_instance_metric_playload(struct fieldstat_instance *instance, char **payload, int *payload_size, int offset)
@@ -254,6 +268,7 @@ static int prometheus_get_instance_metric_playload(struct fieldstat_instance *in
int append_offset = offset;
char *new_payload = NULL;
int new_payload_size = 0;
+ int current_metric_cnt = instance->metric_cnt;
if(instance->running != 1)
{
@@ -270,10 +285,17 @@ static int prometheus_get_instance_metric_playload(struct fieldstat_instance *in
str_unescape(instance->name, instance_name, sizeof(instance_name));
- for(i = 0; i < instance->metric_cnt; i++)
+ for(i = 0; i < current_metric_cnt; i++)
{
metric = get_metric(instance, i);
-
+ if(metric == NULL)
+ {
+ continue;
+ }
+ if(is_output_prometheus(metric) == 1)
+ {
+ continue;
+ }
if(metric->is_ratio == 1)
{
continue;