diff options
| author | chenzizhan <[email protected]> | 2024-07-15 18:15:52 +0800 |
|---|---|---|
| committer | chenzizhan <[email protected]> | 2024-07-15 18:15:52 +0800 |
| commit | 482d4ce120cfa93105991c5edaa3105848a188fe (patch) | |
| tree | 8b53ee4665b97a4ccab5de5579f0f49448a9c054 | |
| parent | 18250bfcca305840c82eb79ad85aae3932565793 (diff) | |
faster spreadsketch by min level
| -rw-r--r-- | src/cells/spread_sketch.c | 47 | ||||
| -rw-r--r-- | test/test_fuzz_test.cpp | 36 |
2 files changed, 73 insertions, 10 deletions
diff --git a/src/cells/spread_sketch.c b/src/cells/spread_sketch.c index 8ad7c74..a05d2bd 100644 --- a/src/cells/spread_sketch.c +++ b/src/cells/spread_sketch.c @@ -51,9 +51,8 @@ struct spread_sketch { struct bucket *buckets; struct entry_table *table; - uint32_t *min_level_per_row; // TODO: 先看看性能吧, 之后再写。用来记录每行最小的level,从而跳过行数。对于64位的level,维持一个计数,额外使用64 r的空间,当一个最小位数的level 计数到0时,更新最小level。 - // TODO: 对比heavy keeper,不仅仅是跳过的问题,heavykeeper 无论什么情况,在输入0的时候都不会走sketch 更新。 - // 或者简单记录用掉的bucket 数量也挺好。 + int level_cnt[65]; // 64: the level range is [1,65] // count every level number。 + uint32_t level_min; // the minimum level in the sketch }; static void *default_new_fn(void *arg) { @@ -170,6 +169,7 @@ void get_parameter_recommendation(int max_super_spreader_number, int *depth_out, struct spread_sketch *spread_sketch_new(int expected_query_num) { struct spread_sketch *pthis = malloc(sizeof(struct spread_sketch)); + get_parameter_recommendation(expected_query_num, &pthis->depth, &pthis->width); pthis->buckets = calloc(pthis->depth * pthis->width, sizeof(struct bucket)); @@ -181,15 +181,37 @@ struct spread_sketch *spread_sketch_new(int expected_query_num) { pthis->table = smart_ptr_table_new(); pthis->table->scheme = pthis->scheme; + pthis->level_min = 0; + memset(pthis->level_cnt, 0, sizeof(pthis->level_cnt)); + pthis->level_cnt[0] = pthis->depth * pthis->width; + return pthis; } +void min_level_state_update(struct spread_sketch *ss, uint32_t level_old, uint32_t level_new) { + ss->level_cnt[level_old]--; + ss->level_cnt[level_new]++; + while (ss->level_cnt[ss->level_min] == 0) { + ss->level_min++; + } +} + // return 0 if not added, return 1 if added int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg) { - // uint64_t hash_identifier = XXH3_64bits_withSeed(identifier, identifier_length, 171); uint32_t level = (uint32_t)__builtin_clzll(item_hash) + 1; - // printf("spread_sketch_add key %s, level %u\n", key, level); - + if (level <= ss->level_min) { + if (item_hash == DUMMY_ITEM_HASH) { + return 0; // DUMMY_ITEM_HASH is the case of "recording a key" instead of update hll. Just return 0 to inform the key is not added to the sketch + } + + struct entry *content; + HASH_FIND(hh, ss->table->entry, key, key_length, content); + if (content != NULL && !content->dying) { + return 1; + } else { + return 0; + } + } // https://www.eecs.harvard.edu/~michaelm/postscripts/tr-02-05.pdf // A technique from the hashing literature is to use two hash functions h1(x) and h2(x) to simulate additional hash functions of the form gi(x) = h1(x) + ih2(x) // Assuming that the 128-bit xxhash function is perfect, we can view it as a combination of two 64-bit hash functions. @@ -207,6 +229,7 @@ int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_leng bucket->content->dying = false; if (bucket->level < level) { + min_level_state_update(ss, bucket->level, level); bucket->level = level; } in_sketch = true; @@ -214,6 +237,7 @@ int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_leng uint32_t old_level = bucket->content == NULL ? 0: bucket->level; if (level > old_level) { + min_level_state_update(ss, old_level, level); // printf("update key %s to %s, and level %u to %u, in bucket (r,w)=(%d,%u)\n", bucket->content == NULL ? "NULL": bucket->content->key, key, old_level, level, i, hash_x % ss->width); const struct entry *content_old = bucket->content; if (content_old != NULL) { @@ -251,19 +275,23 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch * if (bucket_dst->content == NULL) { bucket_dst->content = smart_ptr_table_get(dst->table, bucket_src->content->key, bucket_src->content->key_len, NULL); bucket_dst->level = bucket_src->level; + min_level_state_update(dst, 0, bucket_src->level); continue; } bucket_dst->content->dying = false; if (key_equal(bucket_src->content->key, bucket_src->content->key_len, bucket_dst->content->key, bucket_dst->content->key_len)) { if (bucket_src->level > bucket_dst->level) { + min_level_state_update(dst, bucket_dst->level, bucket_src->level); bucket_dst->level = bucket_src->level; } } else { if (bucket_src->level > bucket_dst->level) { + min_level_state_update(dst, bucket_dst->level, bucket_src->level); + bucket_dst->level = bucket_src->level; + smart_ptr_table_release(dst->table, bucket_dst->content->key, bucket_dst->content->key_len); bucket_dst->content = smart_ptr_table_get(dst->table, bucket_src->content->key, bucket_src->content->key_len, NULL); - bucket_dst->level = bucket_src->level; } } } @@ -305,6 +333,9 @@ void spread_sketch_reset(struct spread_sketch *ss) { ss->scheme.reset_fn(content->exdata); content->dying = true; } + + memset(ss->level_cnt, 0, sizeof(ss->level_cnt)); + ss->level_cnt[0] = ss->depth * ss->width; } void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn) { @@ -347,7 +378,7 @@ size_t spread_sketch_list(const struct spread_sketch *ss, void **exdatas, size_t struct spread_sketch *spread_sketch_copy(const struct spread_sketch *src) { struct spread_sketch *dst = malloc(sizeof(struct spread_sketch)); - memcpy(dst, src, sizeof(struct spread_sketch)); + memcpy(dst, src, sizeof(struct spread_sketch)); // copy the depth, width, level_cnt, level_min dst->buckets = calloc(dst->depth * dst->width, sizeof(struct bucket)); dst->table = smart_ptr_table_new(); diff --git a/test/test_fuzz_test.cpp b/test/test_fuzz_test.cpp index 8989de7..e9cd3b3 100644 --- a/test/test_fuzz_test.cpp +++ b/test/test_fuzz_test.cpp @@ -434,6 +434,7 @@ TEST(Fuzz_test, many_instance_random_flow_unregister_calibrate_reset_fork_merge_ } double accuracy = test_cal_topk_accuracy(test_result, count_map[Fieldstat_tag_list_wrapper(shared_tag_out).to_string()]); + // printf("spreadsketch accuracy: %lf\n", accuracy); EXPECT_GE(accuracy, 0.7); for (size_t j = 0; j < cell_num; j++) { @@ -501,7 +502,7 @@ TEST(Fuzz_test, add_and_reset_with_randomly_generated_flows_and_randomly_chosen_ } -TEST(Fuzz_test, simple_one_for_perf) +TEST(Fuzz_test, simple_one_for_perf_topk) { const int CUBE_NUM = 5; const int FLOW_NUM = 50000; @@ -556,12 +557,43 @@ TEST(Fuzz_test, simple_one_for_perf) fieldstat_free(master); } +TEST(Fuzz_test, simple_one_for_perf_spreadsketch) +{ + const int CELL_MAX = 100; + const int TEST_ROUND = 500000; + struct fieldstat *instance = fieldstat_new(); + + int cube_id = fieldstat_create_cube(instance, &TEST_TAG_STRING, 1, SAMPLING_MODE_TOP_CARDINALITY, CELL_MAX); + fieldstat_register_hll(instance, cube_id, "hll", 6); + + SpreadSketchZipfGenerator generator(1.0, CELL_MAX * 10); + Fieldstat_tag_list_wrapper *cell_dimension[TEST_ROUND]; + Fieldstat_tag_list_wrapper *items[TEST_ROUND]; + for (int i = 0; i < TEST_ROUND; i++) { + Flow flow = generator.next(); + cell_dimension[i] = new Fieldstat_tag_list_wrapper("src_ip", flow.src_ip.c_str()); + items[i] = new Fieldstat_tag_list_wrapper("dst_ip", flow.dst_ip.c_str()); + } + + clock_t start = clock(); + printf("press any key to start \n"); + getchar(); + + for (int i = 0; i < TEST_ROUND; i++) { + fieldstat_hll_add_field(instance, cube_id, 0, cell_dimension[i]->get_tag(), cell_dimension[i]->get_tag_count(), items[i]->get_tag(), items[i]->get_tag_count()); + } + + clock_t end = clock(); + printf("time: %lf second\n", (double)(end - start) / CLOCKS_PER_SEC); + + fieldstat_free(instance); +} int main(int argc, char *argv[]) { testing::InitGoogleTest(&argc, argv); // testing::GTEST_FLAG(filter) = "*spreadsketch"; - testing::GTEST_FLAG(filter) = "Fuzz_test.simple_one_for_perf"; + testing::GTEST_FLAG(filter) = "-Fuzz_test.simple_one_for_perf*"; return RUN_ALL_TESTS(); }
\ No newline at end of file |
