diff options
| author | fumingwei <[email protected]> | 2023-03-14 16:34:18 +0800 |
|---|---|---|
| committer | fumingwei <[email protected]> | 2023-03-14 16:34:18 +0800 |
| commit | 9dbf155d1f115d588b88ffb5eac86e08e2222c1f (patch) | |
| tree | 5c3a40277ed10879a101d41593aee9e8e93f0377 | |
| parent | 13638b967b68b4f60a487ce2ec12c5c7b81d9ba6 (diff) | |
feature:过滤lineprotocol和prometheus输出
| -rw-r--r-- | src/fieldstat.cpp | 23 | ||||
| -rw-r--r-- | src/fieldstat_internal.h | 1 | ||||
| -rw-r--r-- | src/file_output.cpp | 52 | ||||
| -rw-r--r-- | src/line_protocol_output.cpp | 64 | ||||
| -rw-r--r-- | src/prometheus_output.cpp | 26 | ||||
| -rw-r--r-- | test/fieldstat_test.cpp | 63 |
6 files changed, 185 insertions, 44 deletions
diff --git a/src/fieldstat.cpp b/src/fieldstat.cpp index d1bf882..a50cb03 100644 --- a/src/fieldstat.cpp +++ b/src/fieldstat.cpp @@ -49,6 +49,18 @@ int is_valid_field_name(const char* name) } +void get_current_table_line_cnt(struct fieldstat_instance *instance, int n_table, int *tables_line_cnt) +{ + for(int i = 0; i < n_table; i++) + { + if(instance->table_metrics[i] == NULL) + { + continue; + } + tables_line_cnt[i] = instance->table_metrics[i]->line_cnt; + } +} + struct metric_t * get_metric(struct fieldstat_instance *instance, int metric_id) { int block_index = 0; @@ -316,7 +328,6 @@ int fieldstat_register(struct fieldstat_instance *instance, enum field_type type metric_id = atomic_inc(&instance->metric_cnt) - 1; metric_slot = read_metric_slot(instance, metric_id); metric = metric_new(type,field_name,tag_key,tag_value,n_tag); - *metric_slot = metric; switch(type) { @@ -329,6 +340,7 @@ int fieldstat_register(struct fieldstat_instance *instance, enum field_type type default: assert(0); } + *metric_slot = metric; return metric_id; } @@ -347,13 +359,15 @@ long long get_metric_unit_val(struct metric_t *metric,enum field_calc_algo calc_ default: break; } + value = threadsafe_counter_read(&(target->changing)); //value= threadsafe_counter_read(&(target->changing)); if(is_refer == 0) { target->previous_changed = value; target->accumulated += value; - threadsafe_counter_set(&(target->changing), 0); +// threadsafe_counter_set(&(target->changing), 0); + threadsafe_counter_sub(&(target->changing), value); } switch(calc_type) { @@ -564,7 +578,6 @@ struct metric_id_list fieldstat_register_table_metrics(struct fieldstat_instance line_id = atomic_inc(&(table->line_cnt)) - 1; line_slot = read_table_line_slot(table,line_id); table_line = table_line_new(line_name, tag_key, tag_value, n_tag); - *line_slot = table_line; for(i = 0; i < table->column_cnt; i++) { @@ -580,7 +593,7 @@ struct metric_id_list fieldstat_register_table_metrics(struct fieldstat_instance ret_metric_id_list.id[ret_metric_id_list.count ++] = metric_id; } - + *line_slot = table_line; return ret_metric_id_list; } @@ -727,7 +740,6 @@ static int fieldstat_register_histogram_and_summary(struct fieldstat_instance *i metric_id = atomic_inc(&instance->metric_cnt) - 1; metric_slot = read_metric_slot(instance, metric_id); metric = metric_new(type,field_name,tag_key,tag_value,n_tag); - *metric_slot = metric; metric->histogram.highest_trackable_value = (int64_t)highest_trackable_value; metric->histogram.lowest_trackable_value = (int64_t)lowest_trackable_value; @@ -750,6 +762,7 @@ static int fieldstat_register_histogram_and_summary(struct fieldstat_instance *i default: break; } + *metric_slot = metric; return metric_id; } diff --git a/src/fieldstat_internal.h b/src/fieldstat_internal.h index e617787..5883a24 100644 --- a/src/fieldstat_internal.h +++ b/src/fieldstat_internal.h @@ -229,3 +229,4 @@ int fieldstat_output_file(struct fieldstat_instance *instance,long long interval struct table_line * read_table_line(struct table_metric *table, int line_id); int send_udp(int sd, unsigned int dest_ip, unsigned short dest_port, const char * data, int len); int line_protocol_output(struct fieldstat_instance *instance); +void get_current_table_line_cnt(struct fieldstat_instance *instance, int n_table, int *table_line_cnt);
\ No newline at end of file diff --git a/src/file_output.cpp b/src/file_output.cpp index f7aa863..dceb370 100644 --- a/src/file_output.cpp +++ b/src/file_output.cpp @@ -41,7 +41,7 @@ static int print_buf_tag_append_position(char *tag_key[], char *tag_value[], siz return append_pos - print_buf_tags; } -static int output_file_format_default_type_gauge(struct fieldstat_instance *instance,long long interval_ms,char *print_buf, unsigned int size) +static int output_file_format_default_type_gauge(struct fieldstat_instance *instance,int current_metric_cnt,long long interval_ms,char *print_buf, unsigned int size) { int i = 0, j = 0; //display_manifest_t* p = NULL; @@ -52,10 +52,14 @@ static int output_file_format_default_type_gauge(struct fieldstat_instance *inst char *append_pos = print_buf; char print_buf_tags[1024]; - for(i = 0; i < instance->metric_cnt; i++) + for(i = 0; i < current_metric_cnt; i++) { //metric = instance->metric[i]; metric = get_metric(instance, i); + if(metric == NULL) + { + continue; + } if(metric->field_type != FIELD_TYPE_GAUGE) { continue; @@ -108,7 +112,7 @@ static int output_file_format_default_type_gauge(struct fieldstat_instance *inst } -static int output_file_format_default_type_counter(struct fieldstat_instance *instance,long long interval_ms,char*print_buf, unsigned int size) +static int output_file_format_default_type_counter(struct fieldstat_instance *instance, int current_metric_cnt, long long interval_ms,char*print_buf, unsigned int size) { int i=0,j=0; //display_manifest_t* p=NULL; @@ -120,11 +124,15 @@ static int output_file_format_default_type_counter(struct fieldstat_instance *in int metric_cnt = 0; char print_buf_tags[1024]; - for(i = 0;i < instance->metric_cnt; i++) + for(i = 0;i < current_metric_cnt; i++) { //p=_handle->display[i]; //metric = instance->metric[i]; metric = get_metric(instance, i); + if(metric == NULL) + { + continue; + } if(metric->field_type != FIELD_TYPE_COUNTER) { continue; @@ -211,7 +219,7 @@ static int output_file_format_default_type_counter(struct fieldstat_instance *in return append_pos - print_buf; } -static int output_file_format_default_table(struct fieldstat_instance *instance,long long interval_ms,char*print_buf, unsigned int size) +static int output_file_format_default_table(struct fieldstat_instance *instance,int tables_line_cnt[], int current_table_cnt,long long interval_ms,char*print_buf, unsigned int size) { int i = 0, j = 0, k = 0; struct table_metric *table = NULL; @@ -221,7 +229,7 @@ static int output_file_format_default_table(struct fieldstat_instance *instance, long long value = 0; char* append_pos = print_buf; - for(i = 0; i < instance->table_num; i++) //per table + for(i = 0; i < current_table_cnt; i++) //per table { table = instance->table_metrics[i]; append_pos += snprintf(append_pos, size - (append_pos - print_buf),"%-20s\t\t",table->name); @@ -234,9 +242,13 @@ static int output_file_format_default_table(struct fieldstat_instance *instance, table->column_name[j] ); } - for(j = 0; j < table->line_cnt; j++) //per line + for(j = 0; j < tables_line_cnt[i]; j++) //per line { line = read_table_line(table,j); + if(line == NULL) + { + continue; + } //print table line name + tag append_pos += snprintf(append_pos, size - (append_pos - print_buf), @@ -385,14 +397,14 @@ static int output_file_print_hdr_unit(struct metric_t *metric, char*print_buf, s return pos-print_buf; } -static int output_file_format_default_type_histogram_and_summary(struct fieldstat_instance *instance, long long interval_ms, char*print_buf, size_t size) +static int output_file_format_default_type_histogram_and_summary(struct fieldstat_instance *instance, int current_metric_cnt,long long interval_ms, char*print_buf, size_t size) { int i = 0, j = 0, metric_num = 0; char *pos = print_buf; //display_manifest_t* p=NULL; struct metric_t *metric = NULL; - struct metric_t *metric_array[INIT_STAT_FIELD_NUM] = {NULL}; - int metric_is_print[INIT_STAT_FIELD_NUM] = {0}; + struct metric_t *metric_array[current_metric_cnt] = {NULL}; + int metric_is_print[current_metric_cnt] = {0}; if(instance->histogram_cnt == 0 && instance->summary_cnt == 0) @@ -400,9 +412,13 @@ static int output_file_format_default_type_histogram_and_summary(struct fieldsta return 0; } - for(i = 0; i < instance->metric_cnt; i ++) + for(i = 0; i < current_metric_cnt; i ++) { metric = get_metric(instance, i); + if(metric == NULL) + { + continue; + } if(metric->field_type != FIELD_TYPE_SUMMARY && metric->field_type != FILED_TYPE_HISTOGRAM) { @@ -450,7 +466,11 @@ static int output_file_format_default_type_histogram_and_summary(struct fieldsta int fieldstat_output_file(struct fieldstat_instance *instance,long long interval_ms) { - size_t print_buf_sz = instance->metric_cnt*1024; + int tables_line_cnt[TABLE_MAX_NUM]; + int current_table_cnt = instance->table_num; + get_current_table_line_cnt(instance, current_table_cnt, tables_line_cnt); + int current_metric_cnt = instance->metric_cnt; + size_t print_buf_sz = current_metric_cnt*1024; char *print_buf = NULL; char *append_pos = NULL; time_t current = 0; @@ -478,10 +498,10 @@ int fieldstat_output_file(struct fieldstat_instance *instance,long long interval append_pos += snprintf(append_pos, print_buf_sz - (append_pos - print_buf),"%s\n",draw_boundary); //pthread_mutex_lock(&(_handle->reg_lock)); //TODO - append_pos += output_file_format_default_type_gauge(instance, interval_ms, append_pos, print_buf_sz - (append_pos - print_buf)); - append_pos += output_file_format_default_type_counter(instance, interval_ms, append_pos, print_buf_sz - (append_pos - print_buf)); - append_pos += output_file_format_default_table(instance, interval_ms, append_pos, print_buf_sz - (append_pos - print_buf)); - append_pos += output_file_format_default_type_histogram_and_summary(instance, interval_ms, append_pos, print_buf_sz - (append_pos - print_buf)); + append_pos += output_file_format_default_type_gauge(instance, current_metric_cnt, interval_ms, append_pos, print_buf_sz - (append_pos - print_buf)); + append_pos += output_file_format_default_type_counter(instance, current_metric_cnt, interval_ms, append_pos, print_buf_sz - (append_pos - print_buf)); + append_pos += output_file_format_default_table(instance, tables_line_cnt, current_table_cnt, interval_ms, append_pos, print_buf_sz - (append_pos - print_buf)); + append_pos += output_file_format_default_type_histogram_and_summary(instance,current_metric_cnt,interval_ms, append_pos, print_buf_sz - (append_pos - print_buf)); //TODO output table,output histogram,output summary //pthread_mutex_unlock(&(_handle->reg_lock));//TODO } diff --git a/src/line_protocol_output.cpp b/src/line_protocol_output.cpp index 5d820c0..f8b3108 100644 --- a/src/line_protocol_output.cpp +++ b/src/line_protocol_output.cpp @@ -56,7 +56,7 @@ static int output_line_protocol_tag_set_buf(char *tag_key[], char *tag_value[], } -static void output_line_protocol_table(struct fieldstat_instance *instance) +static void output_line_protocol_table(struct fieldstat_instance *instance,int tables_line_cnt[], int current_table_cnt) { int i = 0, j = 0, k = 0; metric_t *metric = NULL; @@ -72,23 +72,21 @@ static void output_line_protocol_table(struct fieldstat_instance *instance) struct table_metric *table = NULL; struct table_line *line = NULL; + int line_value_is_not_zero = 0; - for(i = 0; i < instance->table_num; i++) + for(i = 0; i < current_table_cnt; i++) { table = instance->table_metrics[i]; - for(j = 0; j < table->line_cnt; j++) + for(j = 0; j < tables_line_cnt[i]; j++) { line = read_table_line(table, j); - tag_pos += snprintf(tag_pos, - sizeof(tag_set_buff) - (tag_pos - tag_set_buff), - ",app_name=%s,table_name=%s", - instance->name, - table->name - ); - - tag_pos += output_line_protocol_tag_set_buf(line->tag_key, line->tag_value, line->n_tag, tag_pos, - sizeof(tag_set_buff) - (tag_pos - tag_set_buff)); - + if(line == NULL) + { + continue; + } + line_value_is_not_zero = 0; + tag_pos = tag_set_buff; + field_pos = field_set_buff; for(k = 0; k < table->column_cnt; k ++) { metric = get_metric(instance, line->metric_id_belong_to_line[k]); @@ -96,7 +94,10 @@ static void output_line_protocol_table(struct fieldstat_instance *instance) value = metric->field_type == FIELD_TYPE_GAUGE ? get_metric_unit_val(metric, FS_CALC_CURRENT, 1): get_metric_unit_val(metric, FS_CALC_SPEED, 1); - + if(value != 0) + { + line_value_is_not_zero = 1; + } field_pos += snprintf(field_pos, sizeof(field_set_buff) - (field_set_buff - field_set_buff), "%s=%lld,", @@ -105,15 +106,28 @@ static void output_line_protocol_table(struct fieldstat_instance *instance) ); } + if(line_value_is_not_zero == 0) + { + continue; + } + + tag_pos += snprintf(tag_pos, + sizeof(tag_set_buff) - (tag_pos - tag_set_buff), + ",app_name=%s,table_name=%s", + instance->name, + table->name + ); + + tag_pos += output_line_protocol_tag_set_buf(line->tag_key, line->tag_value, line->n_tag, tag_pos, + sizeof(tag_set_buff) - (tag_pos - tag_set_buff)); + + if(field_pos - field_set_buff > 0) { *(field_pos - 1) = '\0'; } // measurement,tag_set field_set append_line_protocol_line(instance, metric->field_name, tag_set_buff, field_set_buff); - tag_pos = tag_set_buff; - field_pos = field_set_buff; - } } @@ -130,10 +144,20 @@ int line_protocol_output(struct fieldstat_instance *instance) memset(field_set_buff, 0, sizeof(field_set_buff)); memset(tag_set_buff, 0, sizeof(tag_set_buff)); char *tag_pos = tag_set_buff; - - for(i = 0; i < instance->metric_cnt; i++) + //print current time instance start + int tables_line_cnt[TABLE_MAX_NUM]; + int current_table_cnt = instance->table_num; + get_current_table_line_cnt(instance, current_table_cnt, tables_line_cnt); + int current_metric_cnt = instance->metric_cnt; + //print current time instance end + + for(i = 0; i < current_metric_cnt; i++) { metric = get_metric(instance, i); + if(metric == NULL) + { + continue; + } if(metric->is_ratio == 1) { continue; @@ -183,7 +207,7 @@ int line_protocol_output(struct fieldstat_instance *instance) } } - output_line_protocol_table(instance); + output_line_protocol_table(instance,tables_line_cnt,current_table_cnt); flush_line_protocol_metric(instance); return 0; diff --git a/src/prometheus_output.cpp b/src/prometheus_output.cpp index c3ee470..1d4fbfd 100644 --- a/src/prometheus_output.cpp +++ b/src/prometheus_output.cpp @@ -241,6 +241,20 @@ static int prometheus_output_histogram_and_summary(struct metric_t *metric, char return used_len; } +static int is_output_prometheus(struct metric_t *metric) +{ + int i = 0, ret = 0; + for(i = 0; i < (int)metric->n_tag; i++) + { + if(strcmp(metric->tag_key[i], "disable_output_prometheus") == 0 + && strcmp(metric->tag_value[i], "yes") == 0) + { + ret = 1; + break; + } + } + return ret; +} static int prometheus_get_instance_metric_playload(struct fieldstat_instance *instance, char **payload, int *payload_size, int offset) @@ -254,6 +268,7 @@ static int prometheus_get_instance_metric_playload(struct fieldstat_instance *in int append_offset = offset; char *new_payload = NULL; int new_payload_size = 0; + int current_metric_cnt = instance->metric_cnt; if(instance->running != 1) { @@ -270,10 +285,17 @@ static int prometheus_get_instance_metric_playload(struct fieldstat_instance *in str_unescape(instance->name, instance_name, sizeof(instance_name)); - for(i = 0; i < instance->metric_cnt; i++) + for(i = 0; i < current_metric_cnt; i++) { metric = get_metric(instance, i); - + if(metric == NULL) + { + continue; + } + if(is_output_prometheus(metric) == 1) + { + continue; + } if(metric->is_ratio == 1) { continue; diff --git a/test/fieldstat_test.cpp b/test/fieldstat_test.cpp index 34e16fe..4765298 100644 --- a/test/fieldstat_test.cpp +++ b/test/fieldstat_test.cpp @@ -28,6 +28,8 @@ struct thread_para int loops; struct fieldstat_instance * instance; int thread_id; + int shaping_table_id; + int sce_table_id; }; static void* worker_thread(void* arg) @@ -66,6 +68,52 @@ static void* worker_thread(void* arg) return NULL; } +static void* dynamic_register(void* arg) +{ + char name[32] = {0}; + struct thread_para* para=(struct thread_para*)arg; + int loops = para->loops; + struct fieldstat_instance *instance = para->instance; + int shaping_table_id = para->shaping_table_id; + int sce_table_id = para->sce_table_id; + const char * bins_htr = "10,20,30,40,50,60,70,80,90"; + const char * bins_sar = "0.1,0.5,0.8,0.9,0.95,0.99"; + const char * tags_key[] = {"disable_output_prometheus"}; + const char * tags_value[] = {"yes"}; + + while (loops > 0) + { + loops--; + //fieldstat_value_incrby(instance,g_counter_id[i], i + 1); + snprintf(name, sizeof(name), "counter_%d_%d", loops, rand()%10000); + fieldstat_register(instance,FIELD_TYPE_COUNTER,name,tags_key,tags_value,sizeof(tags_key)/sizeof(tags_key[0])); + + memset(name, 0, sizeof(name)); + snprintf(name, sizeof(name), "gauge_%d_%d", loops, rand()%10000); + fieldstat_register(instance,FIELD_TYPE_GAUGE, name, tags_key,tags_value,sizeof(tags_key)/sizeof(tags_key[0])); + + memset(name, 0, sizeof(name)); + snprintf(name, sizeof(name), "summary_%d_%d", loops, rand()%10000); + fieldstat_register_histogram(instance,name,tags_key,tags_value,sizeof(tags_key)/sizeof(tags_key[0]),bins_htr,1,10000,2); + + memset(name, 0, sizeof(name)); + snprintf(name, sizeof(name), "histogram_%d_%d", loops, rand()%10000); + fieldstat_register_summary(instance,name,tags_key,tags_value,sizeof(tags_key)/sizeof(tags_key[0]),bins_sar,1,10000,2); + + memset(name, 0, sizeof(name)); + snprintf(name, sizeof(name), "shp_%d_%d", loops, rand()%10000); + fieldstat_register_table_metrics(instance,shaping_table_id,name,tags_key,tags_value,sizeof(tags_key)/sizeof(tags_key[0])); + + memset(name, 0, sizeof(name)); + snprintf(name, sizeof(name), "sce_%d_%d", loops, rand()%10000); + fieldstat_register_table_metrics(instance,sce_table_id,name,tags_key,tags_value,sizeof(tags_key)/sizeof(tags_key[0])); + + sleep(1); + } + return NULL; +} + + int set_instance_parameter(struct fieldstat_instance *instance) { @@ -264,19 +312,32 @@ int main(int argc, char *argv[]) int thread_num=100; pthread_t threads[thread_num]; + + pthread_t threads_reg[thread_num]; + struct thread_para para; + const char *shaping_column[] = {"in_rx_pkts","in_tx_pkts"}; + enum field_type shaping_type[] = {FIELD_TYPE_COUNTER,FIELD_TYPE_GAUGE}; + para.shaping_table_id = fieldstat_register_table(test_instance,"shaping",shaping_column,shaping_type,sizeof(shaping_column)/sizeof(shaping_column[0])); + const char *sce_column[] = {"sent_bytes","recv_pkts"}; + enum field_type sce_type[] = {FIELD_TYPE_COUNTER,FIELD_TYPE_COUNTER}; + para.sce_table_id = fieldstat_register_table(test_instance, "sce", sce_column,sce_type, sizeof(sce_column)/sizeof(sce_column[0])); para.loops=10; para.instance= test_instance; for(int i=0; i<thread_num; i++) { pthread_create(&(threads[i]), NULL, worker_thread, ¶); + pthread_create(&(threads_reg[i]), NULL, dynamic_register, ¶); } void *temp; for(int i=0; i<thread_num; i++) { pthread_join(threads[i], (void**)&temp); + pthread_join(threads_reg[i], (void**)&temp); } - //fieldstat_passive_output(test_instance); + fieldstat_passive_output(test_instance); + sleep(1); + fieldstat_passive_output(test_instance); sleep(1000); return 0; |
