#include #include #include #include #include "fieldstat.h" #include "fieldstat_internal.h" #include "cJSON.h" #include void check_telegraf_output_content(const char *filepath) { unsigned long long counter_sum = 0; unsigned long long bytes_sum = 0; unsigned long long packages_sum = 0; char read_line_buf[2048] = {0}; cJSON *cjson_metric = NULL; cJSON *cjson_fields = NULL; cJSON *value_counter = NULL; cJSON *value_bytes = NULL; cJSON *value_packages = NULL; FILE *fp = fopen(filepath, "r"); EXPECT_NE(nullptr, fp); while(!feof(fp)) { memset(read_line_buf, 0, sizeof(read_line_buf)); if(NULL == fgets(read_line_buf, sizeof(read_line_buf), fp)) { continue; } cjson_metric = cJSON_Parse(read_line_buf); EXPECT_NE(nullptr, cjson_metric); cjson_fields = cJSON_GetObjectItem(cjson_metric, "fields"); EXPECT_NE(nullptr, cjson_fields); value_bytes = cJSON_GetObjectItem(cjson_fields, "bytes_sum"); if(value_bytes != NULL && value_bytes->type == cJSON_Number) { bytes_sum += value_bytes->valueint; } value_packages = cJSON_GetObjectItem(cjson_fields, "packages_sum"); if(value_packages != NULL && value_packages->type == cJSON_Number) { packages_sum += value_packages->valueint; } value_counter = cJSON_GetObjectItem(cjson_fields, "counter_sum"); if(value_counter != NULL && value_counter->type == cJSON_Number) { counter_sum += value_counter->valueint; } cJSON_Delete(cjson_metric); } EXPECT_EQ(counter_sum, bytes_sum); EXPECT_EQ(counter_sum, packages_sum); fclose(fp); } struct thread_para { int loops; struct fieldstat_dynamic_instance * instance; int thread_id; int table_id; unsigned int *out_column_ids; double duration_s; }; void _worker_thread_metrics_operate(void *arg) { int ret = 0; unsigned long long counter = 0; double elapsed_time = 0.0; struct fieldstat_tag tags[3]; const char *row_name = "security_rule_hits"; struct thread_para *para = (struct thread_para*)arg; time_t start_time, end_time; tags[0].key = "policy_id"; tags[0].value_int = 1; tags[0].value_type = 0; tags[1].key = "quanlity"; tags[1].value_double = 0.50; tags[1].value_type = 1; tags[2].key = "device_id"; tags[2].value_str = "test_device"; tags[2].value_type = 2; start_time = time(NULL); for(;;) { end_time = time(NULL); elapsed_time = difftime(end_time, start_time); if(elapsed_time > para->duration_s) { break; } ret = fieldstat_dynamic_table_metric_value_incrby(para->instance, para->table_id, para->out_column_ids[0], row_name, 1, tags, sizeof(tags)/sizeof(tags[0]), para->thread_id); EXPECT_EQ(0, ret); ret = fieldstat_dynamic_table_metric_value_incrby(para->instance, para->table_id, para->out_column_ids[1], row_name, 1, tags, sizeof(tags)/sizeof(tags[0]), para->thread_id); EXPECT_EQ(0, ret); counter++; usleep(1); } ret = fieldstat_dynamic_metric_value_set(para->instance, FIELD_TYPE_COUNTER, "counter", counter, NULL, 0, para->thread_id); EXPECT_EQ(0, ret); return; } void * worker_thread_metrics_operate(void *arg) { _worker_thread_metrics_operate(arg); return NULL; } TEST(BenchMark, OneMetricMultiAddMultiThreads) { int ret = 0; int n_thread = 64; int n_loops = 10000000; int duration_s = 86400.0; int table_id = -1; void *temp = NULL; struct fieldstat_dynamic_instance *instance = NULL; unsigned int out_column_ids[2]; const char *column_name[] = {"packages", "bytes"}; enum field_type column_type[] = {FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER}; struct thread_para para[n_thread]; pthread_t thread_ids[n_thread]; system("cat /dev/null > /tmp/metrics.out"); instance = fieldstat_dynamic_instance_new("firewall", n_thread); ret = fieldstat_dynamic_set_line_protocol_server(instance, "127.0.0.1", 8700); EXPECT_EQ(0, ret); ret = fieldstat_dynamic_set_output_interval(instance, 1000); EXPECT_EQ(0, ret); table_id = fieldstat_register_dynamic_table(instance, "tsg_master", column_name, column_type, sizeof(column_name)/sizeof(column_name[0]), out_column_ids); EXPECT_EQ(0, table_id); fieldstat_dynamic_instance_start(instance); for(int i = 0; i < n_thread; i++) { para[i].loops = n_loops; para[i].instance = instance; para[i].thread_id = i; para[i].table_id = table_id; para[i].out_column_ids = out_column_ids; para[i].duration_s = duration_s; } for(int i = 0; i < n_thread; i++) { ret = pthread_create(&(thread_ids[i]), NULL, worker_thread_metrics_operate, &(para[i])); EXPECT_EQ(0, ret); } for(int i = 0; i < n_thread; i++) { pthread_join(thread_ids[i], (void**)&temp); } sleep(4); check_telegraf_output_content("/tmp/metrics.out"); fieldstat_dynamic_instance_free(instance); } int main(int argc, char *argv[]) { testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }