#include #include #include #include #include #include #include "fieldstat.h" #include "fieldstat_exporter.h" #include "utils.hpp" using namespace std; void fill_random_tag_of_length_1_to_3(Fieldstat_tag_list_wrapper *fields[], int tag_list_num) { std::uniform_int_distribution dist(1,100); std::mt19937 rng(); for (int i = 0; i < tag_list_num; i++) { Fieldstat_tag_list_wrapper *tmp = new Fieldstat_tag_list_wrapper(dist, rand() % 3 + 1); tmp->sort_tag_list(); fields[i] = tmp; } } void fill_with_elephant_flows(Fieldstat_tag_list_wrapper *fields[], int tag_list_num) { for (int i = 0; i < tag_list_num; i++) { Fieldstat_tag_list_wrapper *tmp; int rand_ret = rand() % 10; if (rand_ret < 5) { tmp = new Fieldstat_tag_list_wrapper("elephant", rand() % 50); // most hit } else if (rand_ret == 6 || rand_ret == 7) { tmp = new Fieldstat_tag_list_wrapper("mid", rand() % 200); } else { tmp = new Fieldstat_tag_list_wrapper("mouse", rand() % 10000); } fields[i] = tmp; } } long long fuzz_fieldstat_counter_get(const struct fieldstat *instance, int cube_id, int metric_id, const struct field_list *tag_list) { long long value = 0; int ret = fieldstat_counter_get(instance, cube_id, tag_list, metric_id, &value); EXPECT_EQ(ret, 0); return value; } double fuzz_fieldstat_hll_get(const struct fieldstat *instance, int cube_id, int metric_id, const struct field_list *tag_list) { double value = 0; int ret = fieldstat_hll_get(instance, cube_id, tag_list, metric_id, &value); EXPECT_EQ(ret, 0); return value; } TEST(Fuzz_test, many_instance_random_flow_unregister_calibrate_reset_fork_merge_comprehensive) { const int METRIC_NUM = 2; const int METRIC_ID_COUNTER = 0; const int METRIC_ID_HLL = 1; const int CUBE_NUM = 5; const int INSTANCE_NUM = 10; const int FLOW_NUM = 5000; const int CELL_MAX = 5000; // must be no less than FLOW_NUM to ensure an accurate statistics const int TEST_ROUND = 100000; const int OUT_GAP = 10000; const char *metric_name[METRIC_NUM] = {"counter_", "hll_"}; struct fieldstat *master = fieldstat_new(); struct fieldstat *replica[INSTANCE_NUM]; struct fieldstat *dest = fieldstat_new(); Fieldstat_tag_list_wrapper *shared_tags[CUBE_NUM]; // init cube for (int i = 0; i < CUBE_NUM; i++) { shared_tags[i] = new Fieldstat_tag_list_wrapper("shared_tag", i); int cube_id = fieldstat_create_cube(master, shared_tags[i]->get_tag(), shared_tags[i]->get_tag_count(), SAMPLING_MODE_COMPREHENSIVE, CELL_MAX); EXPECT_EQ(cube_id, i); fieldstat_register_counter(master, cube_id, metric_name[METRIC_ID_COUNTER]); fieldstat_register_hll(master, cube_id, metric_name[METRIC_ID_HLL], 6); } // all the possible fields Fieldstat_tag_list_wrapper *tag_list_wrapper[FLOW_NUM]; fill_random_tag_of_length_1_to_3(tag_list_wrapper, FLOW_NUM); //all the possible operations long long rand_nums[TEST_ROUND]; string *rand_strs[TEST_ROUND] = {NULL}; for (int i = 0; i < TEST_ROUND; i++) { rand_nums[i] = rand() % 1000; rand_strs[i] = new string(string("str val") + std::to_string(rand_nums[i])); } //init instance for (int i = 0; i < INSTANCE_NUM; i++) { replica[i] = fieldstat_fork(master); } // for benchmark unordered_map comp_count; unordered_map> comp_hll; clock_t start = clock(); int next_shared_tag_value = CUBE_NUM; for (int i = 0; i < TEST_ROUND; i++) { if (i != 0 && i % OUT_GAP == 0) { // merge for (int j = 0; j < INSTANCE_NUM; j++) { fieldstat_merge(dest, replica[j]); } for (int j = 0; j < INSTANCE_NUM; j++) { fieldstat_reset(replica[j]); } // modify master and calibrate int cube_id_to_change = rand() % CUBE_NUM; Fieldstat_tag_list_wrapper *new_tag = new Fieldstat_tag_list_wrapper("shared_tag", next_shared_tag_value++); delete shared_tags[cube_id_to_change]; shared_tags[cube_id_to_change] = new_tag; fieldstat_destroy_cube(master, cube_id_to_change); int cube_id_new = fieldstat_create_cube(master, new_tag->get_tag(), new_tag->get_tag_count(), SAMPLING_MODE_COMPREHENSIVE, CELL_MAX); fieldstat_register_counter(master, cube_id_new, metric_name[METRIC_ID_COUNTER]); fieldstat_register_hll(master, cube_id_new, metric_name[METRIC_ID_HLL], 6); EXPECT_EQ(cube_id_new, cube_id_to_change); // should new the cube in the hole leaved by the destroyed cube // calibrate for (int j = 0; j < INSTANCE_NUM; j++) { fieldstat_calibrate(master, replica[j]); } // check if no merge happens in the last 100 rounds if (i + OUT_GAP >= TEST_ROUND) { break; } } struct fieldstat *instance = replica[rand() % INSTANCE_NUM]; // the flow randomly goes to one of the instance const Fieldstat_tag_list_wrapper * field = tag_list_wrapper[rand() % FLOW_NUM]; int cube_id = rand() % CUBE_NUM; const Fieldstat_tag_list_wrapper *shared_tag = shared_tags[cube_id]; int ret_add = fieldstat_counter_incrby(instance, cube_id, METRIC_ID_COUNTER, field->get_tag(), field->get_tag_count(), rand_nums[i]); if (ret_add == FS_ERR_TOO_MANY_CELLS) { continue; } EXPECT_EQ(ret_add, FS_OK); string *val = rand_strs[i]; ret_add = fieldstat_hll_add(instance, cube_id, METRIC_ID_HLL, field->get_tag(), field->get_tag_count(), val->c_str(), val->size()); EXPECT_EQ(ret_add, FS_OK); string cell_key = shared_tag->to_string() + field->to_string(); comp_count[cell_key] += rand_nums[i]; comp_hll[cell_key].insert(*val); } clock_t end = clock(); printf("time: %lf\n", (double)(end - start) / CLOCKS_PER_SEC); for (int i = 0; i < TEST_ROUND; i++) { delete rand_strs[i]; } for (int i = 0; i < FLOW_NUM; i++) { delete tag_list_wrapper[i]; } for (int i = 0; i < CUBE_NUM; i++) { delete shared_tags[i]; } long long dummy_ll; double dummy_d; int *cube_ids; int cube_num; struct fieldstat *instance_in_focus = dest; fieldstat_get_cubes(instance_in_focus, &cube_ids, &cube_num); for (int i = 0; i < cube_num; i++) { struct field_list *shared_tag_out = fieldstat_cube_get_tags(instance_in_focus, cube_ids[i]); size_t cell_num0; struct field_list *tags0; fieldstat_cube_get_cells(instance_in_focus, cube_ids[i], &tags0, &cell_num0); for (size_t j = 0; j < cell_num0; j++) { EXPECT_EQ(fieldstat_counter_get(instance_in_focus, cube_ids[i], &tags0[j], METRIC_ID_COUNTER, &dummy_ll), FS_OK); EXPECT_EQ(fieldstat_hll_get(instance_in_focus, cube_ids[i], &tags0[j], METRIC_ID_HLL, &dummy_d), FS_OK); } for (size_t j = 0; j < cell_num0; j++) { string tag_str_out = Fieldstat_tag_list_wrapper(&tags0[j]).to_string(); string cell_key = Fieldstat_tag_list_wrapper(shared_tag_out).to_string() + tag_str_out; EXPECT_EQ(comp_count[cell_key], fuzz_fieldstat_counter_get(instance_in_focus, cube_ids[i], 0, &tags0[j])); } fieldstat_tag_list_arr_free(tags0, cell_num0); fieldstat_tag_list_arr_free(shared_tag_out, 1); } free(cube_ids); fieldstat_free(master); fieldstat_free(dest); for (int i = 0; i < INSTANCE_NUM; i++) { fieldstat_free(replica[i]); } } TEST(Fuzz_test, many_instance_random_flow_unregister_calibrate_reset_fork_merge_topk) { const int CUBE_NUM = 5; const int INSTANCE_NUM = 10; const int FLOW_NUM = 50000; const int CELL_MAX = 50; const int TEST_ROUND = 100000; const int OUT_GAP = 10000; struct fieldstat *master = fieldstat_new(); struct fieldstat *replica[INSTANCE_NUM]; struct fieldstat *dest = fieldstat_new(); Fieldstat_tag_list_wrapper *shared_tags[CUBE_NUM]; // init cube for (int i = 0; i < CUBE_NUM; i++) { shared_tags[i] = new Fieldstat_tag_list_wrapper("shared_tag", i); int cube_id = fieldstat_create_cube(master, shared_tags[i]->get_tag(), shared_tags[i]->get_tag_count(), SAMPLING_MODE_TOPK, CELL_MAX); EXPECT_EQ(cube_id, i); fieldstat_register_counter(master, cube_id, "topk"); } // all the possible fields Fieldstat_tag_list_wrapper *tag_list_wrapper[FLOW_NUM]; fill_with_elephant_flows(tag_list_wrapper, FLOW_NUM); //all the possible operations long long rand_nums[TEST_ROUND]; for (int i = 0; i < TEST_ROUND; i++) { rand_nums[i] = rand() % 1000; } //init instance for (int i = 0; i < INSTANCE_NUM; i++) { replica[i] = fieldstat_fork(master); } // for benchmark unordered_map> count_map; // hte first key is shared field, second key is field clock_t start = clock(); int next_shared_tag_value = CUBE_NUM; for (int i = 0; i < TEST_ROUND; i++) { if (i != 0 && i % OUT_GAP == 0) { // merge for (int j = 0; j < INSTANCE_NUM; j++) { fieldstat_merge(dest, replica[j]); } for (int j = 0; j < INSTANCE_NUM; j++) { fieldstat_reset(replica[j]); } // modify master and calibrate int cube_id_to_change = rand() % CUBE_NUM; Fieldstat_tag_list_wrapper *new_tag = new Fieldstat_tag_list_wrapper("shared_tag", next_shared_tag_value++); delete shared_tags[cube_id_to_change]; shared_tags[cube_id_to_change] = new_tag; fieldstat_destroy_cube(master, cube_id_to_change); int cube_id_new = fieldstat_create_cube(master, new_tag->get_tag(), new_tag->get_tag_count(), SAMPLING_MODE_TOPK, CELL_MAX); fieldstat_register_counter(master, cube_id_new, "topk"); EXPECT_EQ(cube_id_new, cube_id_to_change); // should new the cube in the hole leaved by the destroyed cube // calibrate for (int j = 0; j < INSTANCE_NUM; j++) { fieldstat_calibrate(master, replica[j]); } // check if no merge happens in the last 100 rounds if (i + OUT_GAP >= TEST_ROUND) { break; } } struct fieldstat *instance = replica[rand() % INSTANCE_NUM]; // the flow randomly goes to one of the instance const Fieldstat_tag_list_wrapper * field = tag_list_wrapper[rand() % FLOW_NUM]; int cube_id = rand() % CUBE_NUM; const Fieldstat_tag_list_wrapper *shared_tag = shared_tags[cube_id]; int ret_add = fieldstat_counter_incrby(instance, cube_id, 0, field->get_tag(), field->get_tag_count(), rand_nums[i]); if (ret_add == FS_ERR_TOO_MANY_CELLS) { continue; } EXPECT_EQ(ret_add, FS_OK); count_map[shared_tag->to_string()][field->to_string()] += rand_nums[i]; } clock_t end = clock(); printf("time: %lf\n", (double)(end - start) / CLOCKS_PER_SEC); for (int i = 0; i < FLOW_NUM; i++) { delete tag_list_wrapper[i]; } for (int i = 0; i < CUBE_NUM; i++) { delete shared_tags[i]; } int *cube_ids; int cube_num; struct fieldstat *instance_in_focus = dest; fieldstat_get_cubes(instance_in_focus, &cube_ids, &cube_num); for (int i = 0; i < cube_num; i++) { struct field_list *shared_tag_out = fieldstat_cube_get_tags(instance_in_focus, cube_ids[i]); size_t cell_num; struct field_list *fields; fieldstat_cube_get_cells(instance_in_focus, cube_ids[i], &fields, &cell_num); std::vector test_result; for (size_t j = 0; j < cell_num; j++) { test_result.push_back(new Fieldstat_tag_list_wrapper(&fields[j])); } double accuracy = test_cal_topk_accuracy(test_result, count_map[Fieldstat_tag_list_wrapper(shared_tag_out).to_string()]); EXPECT_GE(accuracy, 0.95); // printf("topk accuracy: %lf\n", accuracy); for (size_t j = 0; j < cell_num; j++) { delete test_result[j]; } fieldstat_tag_list_arr_free(fields, cell_num); fieldstat_tag_list_arr_free(shared_tag_out, 1); } free(cube_ids); fieldstat_free(master); fieldstat_free(dest); for (int i = 0; i < INSTANCE_NUM; i++) { fieldstat_free(replica[i]); } } TEST(Fuzz_test, many_instance_random_flow_unregister_calibrate_reset_fork_merge_spreadsketch) { const int CUBE_NUM = 5; const int INSTANCE_NUM = 10; const int CELL_MAX = 50; const int TEST_ROUND = 100000; const int OUT_GAP = 10000; struct fieldstat *master = fieldstat_new(); struct fieldstat *replica[INSTANCE_NUM]; struct fieldstat *dest = fieldstat_new(); Fieldstat_tag_list_wrapper *shared_tags[CUBE_NUM]; // init cube for (int i = 0; i < CUBE_NUM; i++) { shared_tags[i] = new Fieldstat_tag_list_wrapper("shared_tag", i); int cube_id = fieldstat_create_cube(master, shared_tags[i]->get_tag(), shared_tags[i]->get_tag_count(), SAMPLING_MODE_TOP_CARDINALITY, CELL_MAX); EXPECT_EQ(cube_id, i); fieldstat_register_hll(master, cube_id, "hll", 6); } //init instance for (int i = 0; i < INSTANCE_NUM; i++) { replica[i] = fieldstat_fork(master); } SpreadSketchZipfGenerator generator(1.0, CELL_MAX * 10); unordered_map> count_map; // the first key is cube dimension, second key is cell dimension. value is the fanout(hll return value) clock_t start = clock(); int next_shared_tag_value = CUBE_NUM; for (int i = 0; i < TEST_ROUND; i++) { if (i != 0 && i % OUT_GAP == 0) { // merge for (int j = 0; j < INSTANCE_NUM; j++) { fieldstat_merge(dest, replica[j]); } for (int j = 0; j < INSTANCE_NUM; j++) { fieldstat_reset(replica[j]); } // modify master and calibrate int cube_id_to_change = rand() % CUBE_NUM; Fieldstat_tag_list_wrapper *new_tag = new Fieldstat_tag_list_wrapper("shared_tag", next_shared_tag_value++); delete shared_tags[cube_id_to_change]; shared_tags[cube_id_to_change] = new_tag; fieldstat_destroy_cube(master, cube_id_to_change); int cube_id_new = fieldstat_create_cube(master, new_tag->get_tag(), new_tag->get_tag_count(), SAMPLING_MODE_TOP_CARDINALITY, CELL_MAX); fieldstat_register_hll(master, cube_id_new, "hll", 6); EXPECT_EQ(cube_id_new, cube_id_to_change); // should new the cube in the hole leaved by the destroyed cube // calibrate for (int j = 0; j < INSTANCE_NUM; j++) { fieldstat_calibrate(master, replica[j]); } // let merge happens last(no add operation is missed) if (i + OUT_GAP >= TEST_ROUND) { break; } } // add Flow flow = generator.next(); struct fieldstat *instance = replica[rand() % INSTANCE_NUM]; const Fieldstat_tag_list_wrapper cell_dimension("src_ip", flow.src_ip.c_str()); const Fieldstat_tag_list_wrapper item("dst_ip", flow.dst_ip.c_str()); int cube_id = rand() % CUBE_NUM; const Fieldstat_tag_list_wrapper *shared_tag = shared_tags[cube_id]; int ret_add = fieldstat_hll_add_field(instance, cube_id, 0, cell_dimension.get_tag(), cell_dimension.get_tag_count(), item.get_tag(), item.get_tag_count()); if (ret_add == FS_ERR_TOO_MANY_CELLS) { continue; } EXPECT_EQ(ret_add, FS_OK); count_map[shared_tag->to_string()][cell_dimension.to_string()] += 1; } clock_t end = clock(); printf("time: %lf\n", (double)(end - start) / CLOCKS_PER_SEC); for (int i = 0; i < CUBE_NUM; i++) { delete shared_tags[i]; } int *cube_ids; int cube_num; struct fieldstat *instance_in_focus = dest; fieldstat_get_cubes(instance_in_focus, &cube_ids, &cube_num); for (int i = 0; i < cube_num; i++) { struct field_list *shared_tag_out = fieldstat_cube_get_tags(instance_in_focus, cube_ids[i]); size_t cell_num; struct field_list *cells; fieldstat_cube_get_cells(instance_in_focus, cube_ids[i], &cells, &cell_num); std::vector test_result; for (size_t j = 0; j < cell_num; j++) { test_result.push_back(new Fieldstat_tag_list_wrapper(&cells[j])); } double accuracy = test_cal_topk_accuracy(test_result, count_map[Fieldstat_tag_list_wrapper(shared_tag_out).to_string()]); EXPECT_GE(accuracy, 0.7); for (size_t j = 0; j < cell_num; j++) { delete test_result[j]; } fieldstat_tag_list_arr_free(cells, cell_num); fieldstat_tag_list_arr_free(shared_tag_out, 1); } free(cube_ids); fieldstat_free(master); fieldstat_free(dest); for (int i = 0; i < INSTANCE_NUM; i++) { fieldstat_free(replica[i]); } } // issue: https://jira.geedge.net/browse/TSG-21192 // 在reset后,所有项都是dying 状态,此时添加count = 0 的项,不能正常把dying pop掉,误以为sorted set 已满,出现添加失败(FS_ERR_TOO_MANY_CELLS)但是查不到任何cell 的情况。 TEST(Fuzz_test, add_and_reset_with_randomly_generated_flows_and_randomly_chosen_metric) { const int FLOW_NUM = 50000; Fieldstat_tag_list_wrapper *tag_list_wrapper[FLOW_NUM]; fill_with_elephant_flows(tag_list_wrapper, FLOW_NUM); struct fieldstat *instance = fieldstat_new(); int cube_id = fieldstat_create_cube(instance,NULL,0,SAMPLING_MODE_TOPK, 1); // K = 1, just to increase the possibility of FS_ERR_TOO_MANY_CELLS int primary_metric_id = fieldstat_register_counter(instance, cube_id, "counter"); int counter2_id = fieldstat_register_counter(instance, cube_id, "counter2"); fieldstat_counter_incrby(instance, cube_id, primary_metric_id, tag_list_wrapper[0]->get_tag(), tag_list_wrapper[0]->get_tag_count(), 1); fieldstat_counter_incrby(instance, cube_id, counter2_id, tag_list_wrapper[0]->get_tag(), tag_list_wrapper[0]->get_tag_count(), 1); struct field_list tag_list_tmp = {NULL, 0}; for(int i = 0; i < FLOW_NUM; i++) { int using_id = rand() % 2 == 0 ? primary_metric_id : counter2_id; int ret = fieldstat_counter_incrby(instance, cube_id, using_id, tag_list_wrapper[i]->get_tag(), tag_list_wrapper[i]->get_tag_count(), 1); if (ret == FS_ERR_TOO_MANY_CELLS) { struct field_list *tag_list = NULL; size_t n_cell = 0; fieldstat_cube_get_cells(instance, cube_id, &tag_list, &n_cell); ASSERT_EQ(n_cell, 1); long long value; tag_list_tmp.field = (struct field *)tag_list_wrapper[i]->get_tag(); tag_list_tmp.n_field = tag_list_wrapper[i]->get_tag_count(); int counter_exist = fieldstat_counter_get(instance, cube_id, &tag_list_tmp, using_id, &value); ASSERT_EQ(counter_exist, FS_ERR_INVALID_TAG); // the field is not added to the cube fieldstat_tag_list_arr_free(tag_list, n_cell); } if (i % 1000 == 0) { fieldstat_reset(instance); } } for (int i = 0; i < FLOW_NUM; i++) { delete tag_list_wrapper[i]; } fieldstat_free(instance); } TEST(Fuzz_test, simple_one_for_perf) { const int CUBE_NUM = 5; const int FLOW_NUM = 50000; const int CELL_MAX = 50; const int TEST_ROUND = 500000; struct fieldstat *master = fieldstat_new(); Fieldstat_tag_list_wrapper *shared_tags[CUBE_NUM]; // init cube for (int i = 0; i < CUBE_NUM; i++) { shared_tags[i] = new Fieldstat_tag_list_wrapper("shared_tag", i); int cube_id = fieldstat_create_cube(master, shared_tags[i]->get_tag(), shared_tags[i]->get_tag_count(), SAMPLING_MODE_TOP_CARDINALITY, CELL_MAX); EXPECT_EQ(cube_id, i); fieldstat_register_counter(master, cube_id, "topk"); } // init metric // all the possible fields Fieldstat_tag_list_wrapper *tag_list_wrapper[FLOW_NUM]; fill_with_elephant_flows(tag_list_wrapper, FLOW_NUM); //all the possible operations long long *rand_nums = new long long[TEST_ROUND]; for (int i = 0; i < TEST_ROUND; i++) { rand_nums[i] = rand() % 1000; } struct fieldstat *instance = master; clock_t start = clock(); printf("press any key to start v46\n"); getchar(); for (int i = 0; i < TEST_ROUND; i++) { const Fieldstat_tag_list_wrapper * field = tag_list_wrapper[rand() % FLOW_NUM]; int cube_id = rand() % CUBE_NUM; (void)fieldstat_counter_incrby(instance, cube_id, 0, field->get_tag(), field->get_tag_count(), rand_nums[i]); } clock_t end = clock(); printf("time: %lf\n", (double)(end - start) / CLOCKS_PER_SEC); for (int i = 0; i < FLOW_NUM; i++) { delete tag_list_wrapper[i]; } for (int i = 0; i < CUBE_NUM; i++) { delete shared_tags[i]; } delete[] rand_nums; fieldstat_free(master); } int main(int argc, char *argv[]) { testing::InitGoogleTest(&argc, argv); // testing::GTEST_FLAG(filter) = "*spreadsketch"; // testing::GTEST_FLAG(filter) = "-Fuzz_test.simple_one_for_perf"; return RUN_ALL_TESTS(); }