diff options
Diffstat (limited to 'src/line_protocol_output.cpp')
| -rw-r--r-- | src/line_protocol_output.cpp | 177 |
1 files changed, 161 insertions, 16 deletions
diff --git a/src/line_protocol_output.cpp b/src/line_protocol_output.cpp index 56876fe..3f4e86c 100644 --- a/src/line_protocol_output.cpp +++ b/src/line_protocol_output.cpp @@ -1,6 +1,25 @@ #include "fieldstat_internal.h" -static void flush_line_protocol_metric(struct fieldstat_instance *instance) +/* +static void flush_line_protocol_metric_send_buff(int send_socket, unsigned int server_ip, unsigned short server_port, const char *send_buff, int *send_buff_offset) +{ + int offset = *send_buff_offset; + if(offset == 0) + { + return; + } + + if(server_ip > 0 && server_port > 0) + { + send_udp(send_socket, server_ip, server_port, send_buff, offset); + } + + *send_buff_offset = 0; + memset(send_buff, 0, UDP_PAYLOAD_SIZE); +} +*/ + +static void flush_line_protocol_metric_send_buff(struct fieldstat_instance *instance) { if(instance->line_protocol_send_buff_offset == 0) { @@ -16,11 +35,11 @@ static void flush_line_protocol_metric(struct fieldstat_instance *instance) ); } instance->line_protocol_send_buff_offset = 0; - memset(instance->line_protocol_send_buff, 0, sizeof(instance->line_protocol_send_buff)); + memset(instance->line_protocol_send_buff, 0, sizeof(instance->line_protocol_send_buff)); return; } -static void append_line_protocol_line(struct fieldstat_instance *instance, const char* measurement, char *tag_set, char *field_set) +static void append_line_protocol_metric_to_send_buff(struct fieldstat_instance *instance, const char* measurement, char *tag_set, char *field_set) { if(field_set==NULL) { @@ -28,7 +47,7 @@ static void append_line_protocol_line(struct fieldstat_instance *instance, const } if(UDP_PAYLOAD_SIZE - (unsigned int)instance->line_protocol_send_buff_offset < strlen(measurement) + strlen(field_set) + strlen(tag_set) + 2) { - flush_line_protocol_metric(instance); + flush_line_protocol_metric_send_buff(instance); } printf("Line_protocol metric: %s%s %s\n",measurement,tag_set,field_set); instance->line_protocol_send_buff_offset += snprintf(instance->line_protocol_send_buff + instance->line_protocol_send_buff_offset, @@ -65,8 +84,6 @@ static void output_line_protocol_table(struct fieldstat_instance *instance,int t char field_set_buff[UDP_PAYLOAD_SIZE]; char tag_set_buff[UDP_PAYLOAD_SIZE]; - memset(field_set_buff, 0, sizeof(field_set_buff)); - memset(tag_set_buff, 0, sizeof(tag_set_buff)); char *tag_pos = tag_set_buff; char *field_pos = field_set_buff; @@ -87,6 +104,9 @@ static void output_line_protocol_table(struct fieldstat_instance *instance,int t line_value_is_not_zero = 0; tag_pos = tag_set_buff; field_pos = field_set_buff; + memset(field_set_buff, 0, sizeof(field_set_buff)); + memset(tag_set_buff, 0, sizeof(tag_set_buff)); + line_value_is_not_zero = 0; for(k = 0; k < table->column_cnt; k ++) { metric = get_metric(instance, line->metric_id_belong_to_line[k]); @@ -110,7 +130,6 @@ static void output_line_protocol_table(struct fieldstat_instance *instance,int t { continue; } - tag_pos += snprintf(tag_pos, sizeof(tag_set_buff) - (tag_pos - tag_set_buff), ",app_name=%s,table_name=%s", @@ -122,12 +141,8 @@ static void output_line_protocol_table(struct fieldstat_instance *instance,int t sizeof(tag_set_buff) - (tag_pos - tag_set_buff)); - if(field_pos - field_set_buff > 0) - { - *(field_pos - 1) = '\0'; - } // measurement,tag_set field_set - append_line_protocol_line(instance, metric->field_name, tag_set_buff, field_set_buff); + append_line_protocol_metric_to_send_buff(instance, metric->field_name, tag_set_buff, field_set_buff); } } @@ -181,7 +196,7 @@ int line_protocol_output(struct fieldstat_instance *instance) instance->name ); output_line_protocol_tag_set_buf(metric->tag_key, metric->tag_value, metric->n_tag, tag_pos, sizeof(tag_set_buff) - (tag_pos - tag_set_buff)); - append_line_protocol_line(instance, metric->field_name, tag_set_buff, field_set_buff); + append_line_protocol_metric_to_send_buff(instance, metric->field_name, tag_set_buff, field_set_buff); tag_pos = tag_set_buff; } @@ -197,7 +212,7 @@ int line_protocol_output(struct fieldstat_instance *instance) instance->name ); output_line_protocol_tag_set_buf(metric->tag_key, metric->tag_value, metric->n_tag, tag_pos, sizeof(tag_set_buff) - (tag_pos - tag_set_buff)); - append_line_protocol_line(instance, metric->field_name, tag_set_buff, field_set_buff); + append_line_protocol_metric_to_send_buff(instance, metric->field_name, tag_set_buff, field_set_buff); tag_pos = tag_set_buff; } break; @@ -208,7 +223,137 @@ int line_protocol_output(struct fieldstat_instance *instance) } output_line_protocol_table(instance,tables_line_cnt,current_table_cnt); - flush_line_protocol_metric(instance); + flush_line_protocol_metric_send_buff(instance); + + return 0; +} + +static void flush_line_protocol_dynamic_metric(struct fieldstat_dynamic_instance *instance) +{ + if(instance->line_protocol_send_buff_offset == 0) + { + return; + } + + if(instance->line_protocol_server_ip > 0 && instance->line_protocol_server_port > 0) + { + send_udp(instance->line_protocol_socket, instance->line_protocol_server_ip, + (unsigned short)instance->line_protocol_server_port, + instance->line_protocol_send_buff, + instance->line_protocol_send_buff_offset + ); + } + instance->line_protocol_send_buff_offset = 0; + memset(instance->line_protocol_send_buff, 0, sizeof(instance->line_protocol_send_buff)); + return; +} + +static void append_line_protocol_dynamic_row(struct fieldstat_dynamic_instance *instance, const char* measurement, char *tag_set, char *field_set) +{ + if(field_set==NULL) + { + return; + } + if(UDP_PAYLOAD_SIZE - (unsigned int)instance->line_protocol_send_buff_offset < strlen(measurement) + strlen(field_set) + strlen(tag_set) + 2) + { + flush_line_protocol_dynamic_metric(instance); + } + printf("Line_protocol metric: %s%s %s\n",measurement,tag_set,field_set); + instance->line_protocol_send_buff_offset += snprintf(instance->line_protocol_send_buff + instance->line_protocol_send_buff_offset, + sizeof(instance->line_protocol_send_buff) - instance->line_protocol_send_buff_offset, + "%s%s %s\n", + measurement, tag_set, field_set + ); + return; +} + +int line_protocol_dynamic_metric_output(struct fieldstat_dynamic_instance *instance) +{ + struct dynamic_metric **head = NULL; + struct dynamic_metric *dyn_metric, *tmp_dyn_metric; + struct metric **metrics = NULL; + struct metric *metric = NULL; + struct table_metric *table = NULL; + + char tag_set_buff[UDP_PAYLOAD_SIZE]; + char field_set_buff[UDP_PAYLOAD_SIZE]; + + int field_used_len = 0; + int tag_used_len = 0; + int i = 0, j = 0; + long long value; + int row_value_is_not_zero = 0; + + for(i = 0; i < instance->n_thread; i++) + { + head = &instance->n_thread_dynamic_metric[i]; + HASH_ITER(hh, *head, dyn_metric, tmp_dyn_metric) + { + row_value_is_not_zero = 0; + metrics = dyn_metric->metrics; + metric = metrics[0]; + memset(tag_set_buff, 0, sizeof(tag_set_buff)); + memset(field_set_buff, 0 ,sizeof(field_set_buff)); + field_used_len = 0; + tag_used_len = 0; + + if(metric->belong_to_table == 1) + { + table = instance->table_metrics[metric->table_id]; + for(j = 0; j < table->column_cnt; j++) + { + metric = metrics[i]; + value = metric->field_type == FIELD_TYPE_GAUGE ? + get_metric_unit_val(metric, FS_CALC_CURRENT, 0): + get_metric_unit_val(metric, FS_CALC_SPEED, 0); + if(value != 0) + { + row_value_is_not_zero = 1; + } + field_used_len += snprintf(field_set_buff + field_used_len, + sizeof(field_set_buff) - field_used_len, + "%s=%lld,", + metric->table_column_name, + value + ); + + } + if(row_value_is_not_zero == 1) + { + continue; + } + tag_used_len += snprintf(tag_set_buff + tag_used_len, + sizeof(tag_set_buff) - tag_used_len, + ",app_name=%s,table_name=%s", + instance->name, + table->name + ); + + } + else + { + value = metric->field_type == FIELD_TYPE_GAUGE ? + get_metric_unit_val(metric, FS_CALC_CURRENT, 0): + get_metric_unit_val(metric, FS_CALC_SPEED, 0); + if(value == 0) + { + continue; + } + snprintf(field_set_buff, sizeof(field_set_buff), "%s=%lld", metric->field_name, value); + tag_used_len += snprintf(tag_set_buff + tag_used_len, + sizeof(tag_set_buff) - tag_used_len, + ",app_name=%s", + instance->name + ); + + } + tag_used_len += output_line_protocol_tag_set_buf(metric->tag_key, metric->tag_value, metric->n_tag, + tag_set_buff + tag_used_len, sizeof(tag_set_buff) - tag_used_len); + append_line_protocol_dynamic_row(instance, metric->field_name, tag_set_buff, field_set_buff); + } + + } + flush_line_protocol_dynamic_metric(instance); return 0; -}
\ No newline at end of file +} |
