summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchenzizhan <[email protected]>2024-07-15 18:15:52 +0800
committerchenzizhan <[email protected]>2024-07-15 18:15:52 +0800
commit482d4ce120cfa93105991c5edaa3105848a188fe (patch)
tree8b53ee4665b97a4ccab5de5579f0f49448a9c054
parent18250bfcca305840c82eb79ad85aae3932565793 (diff)
faster spreadsketch by min level
-rw-r--r--src/cells/spread_sketch.c47
-rw-r--r--test/test_fuzz_test.cpp36
2 files changed, 73 insertions, 10 deletions
diff --git a/src/cells/spread_sketch.c b/src/cells/spread_sketch.c
index 8ad7c74..a05d2bd 100644
--- a/src/cells/spread_sketch.c
+++ b/src/cells/spread_sketch.c
@@ -51,9 +51,8 @@ struct spread_sketch {
struct bucket *buckets;
struct entry_table *table;
- uint32_t *min_level_per_row; // TODO: 先看看性能吧, 之后再写。用来记录每行最小的level,从而跳过行数。对于64位的level,维持一个计数,额外使用64 r的空间,当一个最小位数的level 计数到0时,更新最小level。
- // TODO: 对比heavy keeper,不仅仅是跳过的问题,heavykeeper 无论什么情况,在输入0的时候都不会走sketch 更新。
- // 或者简单记录用掉的bucket 数量也挺好。
+ int level_cnt[65]; // 64: the level range is [1,65] // count every level number。
+ uint32_t level_min; // the minimum level in the sketch
};
static void *default_new_fn(void *arg) {
@@ -170,6 +169,7 @@ void get_parameter_recommendation(int max_super_spreader_number, int *depth_out,
struct spread_sketch *spread_sketch_new(int expected_query_num) {
struct spread_sketch *pthis = malloc(sizeof(struct spread_sketch));
+
get_parameter_recommendation(expected_query_num, &pthis->depth, &pthis->width);
pthis->buckets = calloc(pthis->depth * pthis->width, sizeof(struct bucket));
@@ -181,15 +181,37 @@ struct spread_sketch *spread_sketch_new(int expected_query_num) {
pthis->table = smart_ptr_table_new();
pthis->table->scheme = pthis->scheme;
+ pthis->level_min = 0;
+ memset(pthis->level_cnt, 0, sizeof(pthis->level_cnt));
+ pthis->level_cnt[0] = pthis->depth * pthis->width;
+
return pthis;
}
+void min_level_state_update(struct spread_sketch *ss, uint32_t level_old, uint32_t level_new) {
+ ss->level_cnt[level_old]--;
+ ss->level_cnt[level_new]++;
+ while (ss->level_cnt[ss->level_min] == 0) {
+ ss->level_min++;
+ }
+}
+
// return 0 if not added, return 1 if added
int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg) {
- // uint64_t hash_identifier = XXH3_64bits_withSeed(identifier, identifier_length, 171);
uint32_t level = (uint32_t)__builtin_clzll(item_hash) + 1;
- // printf("spread_sketch_add key %s, level %u\n", key, level);
-
+ if (level <= ss->level_min) {
+ if (item_hash == DUMMY_ITEM_HASH) {
+ return 0; // DUMMY_ITEM_HASH is the case of "recording a key" instead of update hll. Just return 0 to inform the key is not added to the sketch
+ }
+
+ struct entry *content;
+ HASH_FIND(hh, ss->table->entry, key, key_length, content);
+ if (content != NULL && !content->dying) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
// https://www.eecs.harvard.edu/~michaelm/postscripts/tr-02-05.pdf
// A technique from the hashing literature is to use two hash functions h1(x) and h2(x) to simulate additional hash functions of the form gi(x) = h1(x) + ih2(x)
// Assuming that the 128-bit xxhash function is perfect, we can view it as a combination of two 64-bit hash functions.
@@ -207,6 +229,7 @@ int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_leng
bucket->content->dying = false;
if (bucket->level < level) {
+ min_level_state_update(ss, bucket->level, level);
bucket->level = level;
}
in_sketch = true;
@@ -214,6 +237,7 @@ int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_leng
uint32_t old_level = bucket->content == NULL ? 0: bucket->level;
if (level > old_level) {
+ min_level_state_update(ss, old_level, level);
// printf("update key %s to %s, and level %u to %u, in bucket (r,w)=(%d,%u)\n", bucket->content == NULL ? "NULL": bucket->content->key, key, old_level, level, i, hash_x % ss->width);
const struct entry *content_old = bucket->content;
if (content_old != NULL) {
@@ -251,19 +275,23 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch *
if (bucket_dst->content == NULL) {
bucket_dst->content = smart_ptr_table_get(dst->table, bucket_src->content->key, bucket_src->content->key_len, NULL);
bucket_dst->level = bucket_src->level;
+ min_level_state_update(dst, 0, bucket_src->level);
continue;
}
bucket_dst->content->dying = false;
if (key_equal(bucket_src->content->key, bucket_src->content->key_len, bucket_dst->content->key, bucket_dst->content->key_len)) {
if (bucket_src->level > bucket_dst->level) {
+ min_level_state_update(dst, bucket_dst->level, bucket_src->level);
bucket_dst->level = bucket_src->level;
}
} else {
if (bucket_src->level > bucket_dst->level) {
+ min_level_state_update(dst, bucket_dst->level, bucket_src->level);
+ bucket_dst->level = bucket_src->level;
+
smart_ptr_table_release(dst->table, bucket_dst->content->key, bucket_dst->content->key_len);
bucket_dst->content = smart_ptr_table_get(dst->table, bucket_src->content->key, bucket_src->content->key_len, NULL);
- bucket_dst->level = bucket_src->level;
}
}
}
@@ -305,6 +333,9 @@ void spread_sketch_reset(struct spread_sketch *ss) {
ss->scheme.reset_fn(content->exdata);
content->dying = true;
}
+
+ memset(ss->level_cnt, 0, sizeof(ss->level_cnt));
+ ss->level_cnt[0] = ss->depth * ss->width;
}
void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn) {
@@ -347,7 +378,7 @@ size_t spread_sketch_list(const struct spread_sketch *ss, void **exdatas, size_t
struct spread_sketch *spread_sketch_copy(const struct spread_sketch *src) {
struct spread_sketch *dst = malloc(sizeof(struct spread_sketch));
- memcpy(dst, src, sizeof(struct spread_sketch));
+ memcpy(dst, src, sizeof(struct spread_sketch)); // copy the depth, width, level_cnt, level_min
dst->buckets = calloc(dst->depth * dst->width, sizeof(struct bucket));
dst->table = smart_ptr_table_new();
diff --git a/test/test_fuzz_test.cpp b/test/test_fuzz_test.cpp
index 8989de7..e9cd3b3 100644
--- a/test/test_fuzz_test.cpp
+++ b/test/test_fuzz_test.cpp
@@ -434,6 +434,7 @@ TEST(Fuzz_test, many_instance_random_flow_unregister_calibrate_reset_fork_merge_
}
double accuracy = test_cal_topk_accuracy(test_result, count_map[Fieldstat_tag_list_wrapper(shared_tag_out).to_string()]);
+ // printf("spreadsketch accuracy: %lf\n", accuracy);
EXPECT_GE(accuracy, 0.7);
for (size_t j = 0; j < cell_num; j++) {
@@ -501,7 +502,7 @@ TEST(Fuzz_test, add_and_reset_with_randomly_generated_flows_and_randomly_chosen_
}
-TEST(Fuzz_test, simple_one_for_perf)
+TEST(Fuzz_test, simple_one_for_perf_topk)
{
const int CUBE_NUM = 5;
const int FLOW_NUM = 50000;
@@ -556,12 +557,43 @@ TEST(Fuzz_test, simple_one_for_perf)
fieldstat_free(master);
}
+TEST(Fuzz_test, simple_one_for_perf_spreadsketch)
+{
+ const int CELL_MAX = 100;
+ const int TEST_ROUND = 500000;
+ struct fieldstat *instance = fieldstat_new();
+
+ int cube_id = fieldstat_create_cube(instance, &TEST_TAG_STRING, 1, SAMPLING_MODE_TOP_CARDINALITY, CELL_MAX);
+ fieldstat_register_hll(instance, cube_id, "hll", 6);
+
+ SpreadSketchZipfGenerator generator(1.0, CELL_MAX * 10);
+ Fieldstat_tag_list_wrapper *cell_dimension[TEST_ROUND];
+ Fieldstat_tag_list_wrapper *items[TEST_ROUND];
+ for (int i = 0; i < TEST_ROUND; i++) {
+ Flow flow = generator.next();
+ cell_dimension[i] = new Fieldstat_tag_list_wrapper("src_ip", flow.src_ip.c_str());
+ items[i] = new Fieldstat_tag_list_wrapper("dst_ip", flow.dst_ip.c_str());
+ }
+
+ clock_t start = clock();
+ printf("press any key to start \n");
+ getchar();
+
+ for (int i = 0; i < TEST_ROUND; i++) {
+ fieldstat_hll_add_field(instance, cube_id, 0, cell_dimension[i]->get_tag(), cell_dimension[i]->get_tag_count(), items[i]->get_tag(), items[i]->get_tag_count());
+ }
+
+ clock_t end = clock();
+ printf("time: %lf second\n", (double)(end - start) / CLOCKS_PER_SEC);
+
+ fieldstat_free(instance);
+}
int main(int argc, char *argv[])
{
testing::InitGoogleTest(&argc, argv);
// testing::GTEST_FLAG(filter) = "*spreadsketch";
- testing::GTEST_FLAG(filter) = "Fuzz_test.simple_one_for_perf";
+ testing::GTEST_FLAG(filter) = "-Fuzz_test.simple_one_for_perf*";
return RUN_ALL_TESTS();
} \ No newline at end of file