summaryrefslogtreecommitdiff
path: root/src/line_protocol_output.cpp
diff options
context:
space:
mode:
authorfumingwei <[email protected]>2023-03-16 21:01:51 +0800
committerfumingwei <[email protected]>2023-03-17 23:16:04 +0800
commitea4c2b9c11ef8a02f745b514f4a54f07512a7e8b (patch)
tree8529db19611184292e4a92e6cf6b71dc69f73b6f /src/line_protocol_output.cpp
parent666234f661f5426630aa07554a67a47656bde656 (diff)
feature:新增动态metric相关接口,TODO test
Diffstat (limited to 'src/line_protocol_output.cpp')
-rw-r--r--src/line_protocol_output.cpp177
1 files changed, 161 insertions, 16 deletions
diff --git a/src/line_protocol_output.cpp b/src/line_protocol_output.cpp
index 56876fe..3f4e86c 100644
--- a/src/line_protocol_output.cpp
+++ b/src/line_protocol_output.cpp
@@ -1,6 +1,25 @@
#include "fieldstat_internal.h"
-static void flush_line_protocol_metric(struct fieldstat_instance *instance)
+/*
+static void flush_line_protocol_metric_send_buff(int send_socket, unsigned int server_ip, unsigned short server_port, const char *send_buff, int *send_buff_offset)
+{
+ int offset = *send_buff_offset;
+ if(offset == 0)
+ {
+ return;
+ }
+
+ if(server_ip > 0 && server_port > 0)
+ {
+ send_udp(send_socket, server_ip, server_port, send_buff, offset);
+ }
+
+ *send_buff_offset = 0;
+ memset(send_buff, 0, UDP_PAYLOAD_SIZE);
+}
+*/
+
+static void flush_line_protocol_metric_send_buff(struct fieldstat_instance *instance)
{
if(instance->line_protocol_send_buff_offset == 0)
{
@@ -16,11 +35,11 @@ static void flush_line_protocol_metric(struct fieldstat_instance *instance)
);
}
instance->line_protocol_send_buff_offset = 0;
- memset(instance->line_protocol_send_buff, 0, sizeof(instance->line_protocol_send_buff));
+ memset(instance->line_protocol_send_buff, 0, sizeof(instance->line_protocol_send_buff));
return;
}
-static void append_line_protocol_line(struct fieldstat_instance *instance, const char* measurement, char *tag_set, char *field_set)
+static void append_line_protocol_metric_to_send_buff(struct fieldstat_instance *instance, const char* measurement, char *tag_set, char *field_set)
{
if(field_set==NULL)
{
@@ -28,7 +47,7 @@ static void append_line_protocol_line(struct fieldstat_instance *instance, const
}
if(UDP_PAYLOAD_SIZE - (unsigned int)instance->line_protocol_send_buff_offset < strlen(measurement) + strlen(field_set) + strlen(tag_set) + 2)
{
- flush_line_protocol_metric(instance);
+ flush_line_protocol_metric_send_buff(instance);
}
printf("Line_protocol metric: %s%s %s\n",measurement,tag_set,field_set);
instance->line_protocol_send_buff_offset += snprintf(instance->line_protocol_send_buff + instance->line_protocol_send_buff_offset,
@@ -65,8 +84,6 @@ static void output_line_protocol_table(struct fieldstat_instance *instance,int t
char field_set_buff[UDP_PAYLOAD_SIZE];
char tag_set_buff[UDP_PAYLOAD_SIZE];
- memset(field_set_buff, 0, sizeof(field_set_buff));
- memset(tag_set_buff, 0, sizeof(tag_set_buff));
char *tag_pos = tag_set_buff;
char *field_pos = field_set_buff;
@@ -87,6 +104,9 @@ static void output_line_protocol_table(struct fieldstat_instance *instance,int t
line_value_is_not_zero = 0;
tag_pos = tag_set_buff;
field_pos = field_set_buff;
+ memset(field_set_buff, 0, sizeof(field_set_buff));
+ memset(tag_set_buff, 0, sizeof(tag_set_buff));
+ line_value_is_not_zero = 0;
for(k = 0; k < table->column_cnt; k ++)
{
metric = get_metric(instance, line->metric_id_belong_to_line[k]);
@@ -110,7 +130,6 @@ static void output_line_protocol_table(struct fieldstat_instance *instance,int t
{
continue;
}
-
tag_pos += snprintf(tag_pos,
sizeof(tag_set_buff) - (tag_pos - tag_set_buff),
",app_name=%s,table_name=%s",
@@ -122,12 +141,8 @@ static void output_line_protocol_table(struct fieldstat_instance *instance,int t
sizeof(tag_set_buff) - (tag_pos - tag_set_buff));
- if(field_pos - field_set_buff > 0)
- {
- *(field_pos - 1) = '\0';
- }
// measurement,tag_set field_set
- append_line_protocol_line(instance, metric->field_name, tag_set_buff, field_set_buff);
+ append_line_protocol_metric_to_send_buff(instance, metric->field_name, tag_set_buff, field_set_buff);
}
}
@@ -181,7 +196,7 @@ int line_protocol_output(struct fieldstat_instance *instance)
instance->name
);
output_line_protocol_tag_set_buf(metric->tag_key, metric->tag_value, metric->n_tag, tag_pos, sizeof(tag_set_buff) - (tag_pos - tag_set_buff));
- append_line_protocol_line(instance, metric->field_name, tag_set_buff, field_set_buff);
+ append_line_protocol_metric_to_send_buff(instance, metric->field_name, tag_set_buff, field_set_buff);
tag_pos = tag_set_buff;
}
@@ -197,7 +212,7 @@ int line_protocol_output(struct fieldstat_instance *instance)
instance->name
);
output_line_protocol_tag_set_buf(metric->tag_key, metric->tag_value, metric->n_tag, tag_pos, sizeof(tag_set_buff) - (tag_pos - tag_set_buff));
- append_line_protocol_line(instance, metric->field_name, tag_set_buff, field_set_buff);
+ append_line_protocol_metric_to_send_buff(instance, metric->field_name, tag_set_buff, field_set_buff);
tag_pos = tag_set_buff;
}
break;
@@ -208,7 +223,137 @@ int line_protocol_output(struct fieldstat_instance *instance)
}
output_line_protocol_table(instance,tables_line_cnt,current_table_cnt);
- flush_line_protocol_metric(instance);
+ flush_line_protocol_metric_send_buff(instance);
+
+ return 0;
+}
+
+static void flush_line_protocol_dynamic_metric(struct fieldstat_dynamic_instance *instance)
+{
+ if(instance->line_protocol_send_buff_offset == 0)
+ {
+ return;
+ }
+
+ if(instance->line_protocol_server_ip > 0 && instance->line_protocol_server_port > 0)
+ {
+ send_udp(instance->line_protocol_socket, instance->line_protocol_server_ip,
+ (unsigned short)instance->line_protocol_server_port,
+ instance->line_protocol_send_buff,
+ instance->line_protocol_send_buff_offset
+ );
+ }
+ instance->line_protocol_send_buff_offset = 0;
+ memset(instance->line_protocol_send_buff, 0, sizeof(instance->line_protocol_send_buff));
+ return;
+}
+
+static void append_line_protocol_dynamic_row(struct fieldstat_dynamic_instance *instance, const char* measurement, char *tag_set, char *field_set)
+{
+ if(field_set==NULL)
+ {
+ return;
+ }
+ if(UDP_PAYLOAD_SIZE - (unsigned int)instance->line_protocol_send_buff_offset < strlen(measurement) + strlen(field_set) + strlen(tag_set) + 2)
+ {
+ flush_line_protocol_dynamic_metric(instance);
+ }
+ printf("Line_protocol metric: %s%s %s\n",measurement,tag_set,field_set);
+ instance->line_protocol_send_buff_offset += snprintf(instance->line_protocol_send_buff + instance->line_protocol_send_buff_offset,
+ sizeof(instance->line_protocol_send_buff) - instance->line_protocol_send_buff_offset,
+ "%s%s %s\n",
+ measurement, tag_set, field_set
+ );
+ return;
+}
+
+int line_protocol_dynamic_metric_output(struct fieldstat_dynamic_instance *instance)
+{
+ struct dynamic_metric **head = NULL;
+ struct dynamic_metric *dyn_metric, *tmp_dyn_metric;
+ struct metric **metrics = NULL;
+ struct metric *metric = NULL;
+ struct table_metric *table = NULL;
+
+ char tag_set_buff[UDP_PAYLOAD_SIZE];
+ char field_set_buff[UDP_PAYLOAD_SIZE];
+
+ int field_used_len = 0;
+ int tag_used_len = 0;
+ int i = 0, j = 0;
+ long long value;
+ int row_value_is_not_zero = 0;
+
+ for(i = 0; i < instance->n_thread; i++)
+ {
+ head = &instance->n_thread_dynamic_metric[i];
+ HASH_ITER(hh, *head, dyn_metric, tmp_dyn_metric)
+ {
+ row_value_is_not_zero = 0;
+ metrics = dyn_metric->metrics;
+ metric = metrics[0];
+ memset(tag_set_buff, 0, sizeof(tag_set_buff));
+ memset(field_set_buff, 0 ,sizeof(field_set_buff));
+ field_used_len = 0;
+ tag_used_len = 0;
+
+ if(metric->belong_to_table == 1)
+ {
+ table = instance->table_metrics[metric->table_id];
+ for(j = 0; j < table->column_cnt; j++)
+ {
+ metric = metrics[i];
+ value = metric->field_type == FIELD_TYPE_GAUGE ?
+ get_metric_unit_val(metric, FS_CALC_CURRENT, 0):
+ get_metric_unit_val(metric, FS_CALC_SPEED, 0);
+ if(value != 0)
+ {
+ row_value_is_not_zero = 1;
+ }
+ field_used_len += snprintf(field_set_buff + field_used_len,
+ sizeof(field_set_buff) - field_used_len,
+ "%s=%lld,",
+ metric->table_column_name,
+ value
+ );
+
+ }
+ if(row_value_is_not_zero == 1)
+ {
+ continue;
+ }
+ tag_used_len += snprintf(tag_set_buff + tag_used_len,
+ sizeof(tag_set_buff) - tag_used_len,
+ ",app_name=%s,table_name=%s",
+ instance->name,
+ table->name
+ );
+
+ }
+ else
+ {
+ value = metric->field_type == FIELD_TYPE_GAUGE ?
+ get_metric_unit_val(metric, FS_CALC_CURRENT, 0):
+ get_metric_unit_val(metric, FS_CALC_SPEED, 0);
+ if(value == 0)
+ {
+ continue;
+ }
+ snprintf(field_set_buff, sizeof(field_set_buff), "%s=%lld", metric->field_name, value);
+ tag_used_len += snprintf(tag_set_buff + tag_used_len,
+ sizeof(tag_set_buff) - tag_used_len,
+ ",app_name=%s",
+ instance->name
+ );
+
+ }
+ tag_used_len += output_line_protocol_tag_set_buf(metric->tag_key, metric->tag_value, metric->n_tag,
+ tag_set_buff + tag_used_len, sizeof(tag_set_buff) - tag_used_len);
+ append_line_protocol_dynamic_row(instance, metric->field_name, tag_set_buff, field_set_buff);
+ }
+
+ }
+ flush_line_protocol_dynamic_metric(instance);
return 0;
-} \ No newline at end of file
+}