diff options
| -rw-r--r-- | include/fieldstat/fieldstat.h | 1 | ||||
| -rw-r--r-- | src/cube.c | 46 | ||||
| -rw-r--r-- | src/tags/spread_sketch.c | 127 | ||||
| -rw-r--r-- | test/utils.cpp | 147 | ||||
| -rw-r--r-- | test/utils.hpp | 23 |
5 files changed, 285 insertions, 59 deletions
diff --git a/include/fieldstat/fieldstat.h b/include/fieldstat/fieldstat.h index 941b89c..b693d98 100644 --- a/include/fieldstat/fieldstat.h +++ b/include/fieldstat/fieldstat.h @@ -34,6 +34,7 @@ enum fs_tag_type enum sampling_mode { SAMPLING_MODE_COMPREHENSIVE, SAMPLING_MODE_TOPK, + SAMPLING_MODE_SPREADSKETCH, }; struct fieldstat_tag { @@ -13,6 +13,7 @@ #include "metric.h" #include "heavy_keeper.h" #include "tag_map.h" +#include "spread_sketch.h" #define DEFAULT_N_METRIC 32 #define DEFAULT_N_CUBE 64 @@ -42,6 +43,7 @@ struct cube { union { struct heavy_keeper *topk; struct hash_table *comprehensive; + struct spread_sketch *spread_sketch; }; size_t max_n_cell; @@ -503,6 +505,9 @@ struct cube *cube_new(const struct fieldstat_tag *shared_tags, size_t n_tag, enu cube->comprehensive = hash_table_new(max_n_cell); hash_table_set_exdata_schema(cube->comprehensive, exdata_new_i, exdata_free_i, exdata_merge_i, exdata_reset_i, exdata_copy_i); break; + case SAMPLING_MODE_SPREADSKETCH: + cube->spread_sketch = spread_sketch_new(max_n_cell); + spread_sketch_set_exdata_schema(cube->spread_sketch, exdata_new_i, exdata_free_i, exdata_merge_i, exdata_reset_i, exdata_copy_i); default: assert(0); break; @@ -520,6 +525,9 @@ void cube_free(struct cube *cube) { case SAMPLING_MODE_COMPREHENSIVE: hash_table_free(cube->comprehensive); break; + case SAMPLING_MODE_SPREADSKETCH: + spread_sketch_free(cube->spread_sketch); + break; default: assert(0); break; @@ -532,10 +540,25 @@ void cube_free(struct cube *cube) { } void cube_reset(struct cube *cube) { - if (cube->sampling_mode == SAMPLING_MODE_TOPK) { + // if (cube->sampling_mode == SAMPLING_MODE_TOPK) { + // heavy_keeper_reset(cube->topk); + // } else { + // hash_table_reset(cube->comprehensive); + // } + switch (cube->sampling_mode) + { + case SAMPLING_MODE_TOPK: heavy_keeper_reset(cube->topk); - } else { + break; + case SAMPLING_MODE_COMPREHENSIVE: hash_table_reset(cube->comprehensive); + break; + case SAMPLING_MODE_SPREADSKETCH: + spread_sketch_reset(cube->spread_sketch); + break; + default: + assert(0); + break; } } @@ -580,7 +603,24 @@ struct cell *get_cell(struct cube *cube, const struct fieldstat_tag *tags, size_ } } break;} - } + case SAMPLING_MODE_SPREADSKETCH: { + if (cube->primary_metric_id != metric_id) { + cell_data = spread_sketch_get0_exdata(cube->spread_sketch, tag_in_string, tag_len); + // todo: spread sketch 没办法支持dummy 场景。首先,我不能让所有metric 都走spread sketch流程, + // 因为正常来说,用level=0 的hashy 做数值,没有任何意义,肯定都更新不了,只是在刚开始的时候,起一个记录key 的作用。 + // 而,如果像是topk 那样,给count=0 的一席之地,那么存在问题:spread sketch本身不对记录的个数有限制,所以什么时候停止记录呢?这样的设计也太麻烦了。 + // 之前跟老板讨论的时候,给了两个方案,方案1:做一个buffer,如果get exdata0 get 不到,则往buffer 中的cell 里写,等到来了primary以后,把cell 送进去。 + // 方案2:简单略去第一轮添加时的情况。这会造成很少量的误差。不过,实际上这个操作不是逐包而是会话开始结束时来一次,所以误差也不会太小。必须是让贺岚风能第一个操作primary metric。 + } + if (cell_data == NULL) { + int tmp_ret = spread_sketch_add(cube->spread_sketch, tag_in_string, tag_len, 0, (void *)&args); + if (tmp_ret == 1) { + cell_data = spread_sketch_get0_exdata(cube->spread_sketch, tag_in_string, tag_len); + } + } + break;} + } + free(tag_in_string); return cell_data; diff --git a/src/tags/spread_sketch.c b/src/tags/spread_sketch.c index 1a44d6a..c654e89 100644 --- a/src/tags/spread_sketch.c +++ b/src/tags/spread_sketch.c @@ -37,8 +37,18 @@ struct smart_ptr { // todo:entry UT_hash_handle hh; }; -struct smart_ptr_table { - struct smart_ptr *table; +struct spread_sketch_scheme { + exdata_new_cb new_fn; + exdata_free_cb free_fn; + exdata_merge_cb merge_fn; + exdata_reset_cb reset_fn; + exdata_copy_cb copy_fn; +}; + +struct smart_ptr_table { // TODO: entry table + struct smart_ptr *entry; + + struct spread_sketch_scheme scheme; }; struct bucket { @@ -50,14 +60,13 @@ struct spread_sketch { int depth; int width; - exdata_new_cb new_fn; - exdata_free_cb free_fn; - exdata_merge_cb merge_fn; - exdata_reset_cb reset_fn; - exdata_copy_cb copy_fn; - + struct spread_sketch_scheme scheme; + struct bucket *buckets; - struct group_table *groups; + struct smart_ptr_table *table; + + uint32_t *min_level_per_row; // TODO: 先看看性能吧, 之后再写。用来记录每行最小的level,从而跳过行数。对于64位的level,维持一个计数,额外使用64 r的空间,当一个最小位数的level 计数到0时,更新最小level。 + // TODO: 对比heavy keeper,不仅仅是跳过的问题,heavykeeper 无论什么情况,在输入0的时候都不会走sketch 更新。 }; static void *default_new_fn(void *arg) { @@ -87,9 +96,9 @@ static inline char *key_dup(const char *key, size_t key_len) { return ret; } -const struct smart_ptr *smart_prt_table_get(struct smart_ptr_table *table, const char *key, size_t key_len, void *arg) { +struct smart_ptr *smart_ptr_table_get(struct smart_ptr_table *table, const char *key, size_t key_len, void *arg) { struct smart_ptr *ret; - HASH_FIND(hh, table, key, key_len, ret); + HASH_FIND(hh, table->entry, key, key_len, ret); if (ret != NULL) { ret->ref_count++; @@ -101,25 +110,25 @@ const struct smart_ptr *smart_prt_table_get(struct smart_ptr_table *table, const if (arg == NULL) { ret->exdata = NULL; } else { - ret->exdata = table->new_fn(arg); + ret->exdata = table->scheme.new_fn(arg); } - HASH_ADD_KEYPTR(hh, table->table, ret->key, ret->key_len, ret); + HASH_ADD_KEYPTR(hh, table->entry, ret->key, ret->key_len, ret); } return ret; } -int smart_prt_table_release(struct smart_ptr_table *table, const char *key, size_t key_len) { +int smart_ptr_table_release(struct smart_ptr_table *table, const char *key, size_t key_len) { struct smart_ptr *ret; - HASH_FIND(hh, table->table, key, key_len, ret); + HASH_FIND(hh, table->entry, key, key_len, ret); if (ret == NULL) { return -1; } ret->ref_count--; if (ret->ref_count == 0) { - HASH_DEL(table->table, ret); - table->free_fn(ret->exdata); + HASH_DEL(table->entry, ret); + table->scheme.free_fn(ret->exdata); free(ret->key); free(ret); } @@ -127,26 +136,27 @@ int smart_prt_table_release(struct smart_ptr_table *table, const char *key, size return 0; } -void smart_prt_table_free(struct smart_ptr_table *table) { +void smart_ptr_table_free(struct smart_ptr_table *table) { struct smart_ptr *current, *tmp; - HASH_ITER(hh, table->table, current, tmp) { - HASH_DEL(table->table, current); - table->free_fn(current->exdata); + HASH_ITER(hh, table->entry, current, tmp) { + HASH_DEL(table->entry, current); + table->scheme.free_fn(current->exdata); free(current->key); free(current); } free(table); } -struct smart_ptr_table *smart_prt_table_new() { +struct smart_ptr_table *smart_ptr_table_new() { struct smart_ptr_table *table = malloc(sizeof(struct smart_ptr_table)); - table->table = NULL; + table->entry = NULL; + + table->scheme.new_fn = default_new_fn; + table->scheme.free_fn = default_free_fn; + table->scheme.merge_fn = default_merge_fn; + table->scheme.reset_fn = default_reset_fn; + table->scheme.copy_fn = default_copy_fn; - table->new_fn = default_new_fn; - table->free_fn = default_free_fn; - table->merge_fn = default_merge_fn; - table->reset_fn = default_reset_fn; - table->copy_fn = default_copy_fn; return table; } @@ -172,7 +182,13 @@ struct spread_sketch *spread_sketch_new(int expected_query_num) { get_parameter_recommendation(expected_query_num, &pthis->depth, &pthis->width); pthis->buckets = calloc(pthis->depth * pthis->width, sizeof(struct bucket)); - pthis->table = smart_prt_table_new(); + pthis->scheme.new_fn = default_new_fn; + pthis->scheme.free_fn = default_free_fn; + pthis->scheme.merge_fn = default_merge_fn; + pthis->scheme.reset_fn = default_reset_fn; + pthis->scheme.copy_fn = default_copy_fn; + pthis->table = smart_ptr_table_new(); + pthis->table->scheme = pthis->scheme; return pthis; } @@ -204,9 +220,9 @@ int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_leng uint32_t true_level = bucket->content == NULL ? 0: bucket->level; if (true_level < level) { - struct smart_ptr *content_old = bucket->content; - smart_prt_table_release(ss->table, content_old->key, content_old->key_len, ss->free_fn); - struct smart_ptr *content_new = smart_prt_table_get(ss->table, key, key_length, arg); + const struct smart_ptr *content_old = bucket->content; + smart_ptr_table_release(ss->table, content_old->key, content_old->key_len); + struct smart_ptr *content_new = smart_ptr_table_get(ss->table, key, key_length, arg); bucket->content = content_new; bucket->level = level; @@ -219,7 +235,7 @@ int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_leng } void spread_sketch_free(struct spread_sketch *ss) { - smart_prt_table_free(ss->table, ss->free_fn); + smart_ptr_table_free(ss->table); free(ss->buckets); free(ss); } @@ -236,7 +252,7 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch * continue; } if (bucket_dst->content == NULL) { - bucket_dst->content = smart_prt_table_get(dst->table, bucket_src->content->key, bucket_src->content->key_len, NULL); + bucket_dst->content = smart_ptr_table_get(dst->table, bucket_src->content->key, bucket_src->content->key_len, NULL); bucket_dst->level = bucket_src->level; continue; } @@ -247,31 +263,33 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch * } } else { if (bucket_src->level > bucket_dst->level) { - smart_prt_table_release(dst->table, bucket_dst->content->key, bucket_dst->content->key_len); - bucket_dst->content = smart_prt_table_get(dst->table, bucket_src->content->key, bucket_src->content->key_len, NULL); + smart_ptr_table_release(dst->table, bucket_dst->content->key, bucket_dst->content->key_len); + bucket_dst->content = smart_ptr_table_get(dst->table, bucket_src->content->key, bucket_src->content->key_len, NULL); bucket_dst->level = bucket_src->level; } } } + const struct spread_sketch_scheme *scheme = &dst->table->scheme; + struct smart_ptr *content_dest, *content_src, *tmp; - HASH_ITER(hh, dst->table->table, content_dest, tmp) { - HASH_FIND(hh, src->table->table, content_dest->key, content_dest->key_len, content_src); + HASH_ITER(hh, dst->table->entry, content_dest, tmp) { + HASH_FIND(hh, src->table->entry, content_dest->key, content_dest->key_len, content_src); if (content_src == NULL) { continue; } if (content_dest->exdata == NULL) { - content_dest->exdata = dst->table->copy_fn(content_src->exdata); + content_dest->exdata = scheme->copy_fn(content_src->exdata); } else { - dst->table->merge_fn(content_dest->exdata, content_src->exdata); + scheme->merge_fn(content_dest->exdata, content_src->exdata); } } } void *spread_sketch_get0_exdata(const struct spread_sketch *ss, const char *key, size_t key_len) { struct smart_ptr *content; - HASH_FIND(hh, ss->table->table, key, key_len, content); + HASH_FIND(hh, ss->table->entry, key, key_len, content); if (content == NULL) { return NULL; } @@ -285,22 +303,23 @@ void spread_sketch_reset(struct spread_sketch *ss) { } struct smart_ptr *content, *tmp; - HASH_ITER(hh, ss->table->table, content, tmp) { - ss->reset_fn(content->exdata); + HASH_ITER(hh, ss->table->entry, content, tmp) { + ss->scheme.reset_fn(content->exdata); } } void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn) { - ss->table->new_fn = new_fn; - ss->table->free_fn = free_fn; - ss->table->merge_fn = merge_fn; - ss->table->reset_fn = reset_fn; - ss->table->copy_fn = copy_fn; - return 0; + ss->scheme.new_fn = new_fn; + ss->scheme.free_fn = free_fn; + ss->scheme.merge_fn = merge_fn; + ss->scheme.reset_fn = reset_fn; + ss->scheme.copy_fn = copy_fn; + + ss->table->scheme = ss->scheme; } int spread_sketch_get_count(const struct spread_sketch *ss) { - return HASH_CNT(hh, ss->table->table); + return HASH_CNT(hh, ss->table->entry); } // 这个函数还是会忠实的把内部所有内容都拿出来,但是预期是最多expected_query_num 个,所以在外层要做一个排序。 @@ -308,7 +327,7 @@ int spread_sketch_get_count(const struct spread_sketch *ss) { size_t spread_sketch_list(const struct spread_sketch *ss, void **exdatas, size_t n_exdatas) { size_t count = 0; struct smart_ptr *content, *tmp; - HASH_ITER(hh, ss->table->table, content, tmp) { + HASH_ITER(hh, ss->table->entry, content, tmp) { if (count >= n_exdatas) { break; } @@ -323,16 +342,16 @@ struct spread_sketch *spread_sketch_copy(const struct spread_sketch *src) { memcpy(dst, src, sizeof(struct spread_sketch)); dst->buckets = calloc(dst->depth * dst->width, sizeof(struct bucket)); - dst->table = smart_prt_table_new(); + dst->table = smart_ptr_table_new(); for (int i = 0; i < dst->depth * dst->width; i++) { if (src->buckets[i].content == NULL) { continue; } - dst->buckets[i].content = smart_prt_table_get(dst->table, src->buckets[i].content->key, src->buckets[i].content->key_len, NULL); + dst->buckets[i].content = smart_ptr_table_get(dst->table, src->buckets[i].content->key, src->buckets[i].content->key_len, NULL); dst->buckets[i].level = src->buckets[i].level; if (dst->buckets[i].content->exdata == NULL) { - dst->buckets[i].content->exdata = src->copy_fn(src->buckets[i].content->exdata); + dst->buckets[i].content->exdata = src->scheme.copy_fn(src->buckets[i].content->exdata); } } return dst; diff --git a/test/utils.cpp b/test/utils.cpp index b53e4e4..4015dd4 100644 --- a/test/utils.cpp +++ b/test/utils.cpp @@ -8,7 +8,10 @@ #include <random> #include <string.h> #include <algorithm> - +#include <fstream> +#include <math.h> +#include <unistd.h> +#include <sstream> #include "fieldstat.h" #include "utils.hpp" @@ -293,3 +296,145 @@ double test_cal_topk_accuracy(vector<struct Fieldstat_tag_list_wrapper *> &test_ double accuracy = (double)correct / test_result.size(); return accuracy; } + + +//=========================================================================== +//= Function to generate Zipf (power law) distributed random variables = +//= - Input: alpha and N = +//= - Output: Returns with Zipf distributed random variable = +//=========================================================================== +int zipf(double alpha, int n) +{ + static bool first = true; // Static first time flag + static double c = 0; // Normalization constant + double z; // Uniform random number (0 < z < 1) + double sum_prob; // Sum of probabilities + double zipf_value; // Computed exponential value to be returned + int i; // Loop counter + + // Compute normalization constant on first call only + if (first) + { + for (i=1; i<=n; i++) + c = c + (1.0 / pow((double) i, alpha)); + c = 1.0 / c; + first = false; + } + + // Pull a uniform random number (0 < z < 1) + do + { + z = (double)rand() / (double)RAND_MAX; + } + while ((z == 0.0) || (z == 1.0)); + + // Map z to the value + sum_prob = 0; + for (i=1; i<=n; i++) + { + sum_prob = sum_prob + c / pow((double) i, alpha); + if (sum_prob >= z) + { + zipf_value = i; + break; + } + } + + return(zipf_value); +} + + +// class SpreadSketchZipfGenerator { +// private: +// const int MAX_DATA = 1000000; +// std::pair<std::string, std::string> *loadeds; +// unsigned cursor; + +// public: +// SpreadSketchZipfGenerator(double alpha, int n) { + +// } + +// struct Flow next() { +// int r_cursor = cursor % MAX_DATA; +// struct Flow flow; +// flow.src_ip = loadeds[r_cursor].first; +// flow.dst_ip = loadeds[r_cursor].second; + +// cursor++; + +// return flow; +// } + +// ~SpreadSketchZipfGenerator() { +// delete[] loadeds; +// } + +// double _alpha; +// int _n; +// }; + +SpreadSketchZipfGenerator::SpreadSketchZipfGenerator(double alpha, int n) { + _alpha = alpha; + _n = n; + + // generate data and write them to file + std::string filename = "zipf_" + std::to_string(alpha) + "_" + std::to_string(n) + ".txt"; + + std::unordered_map<int, int> fanout_map; // src_ip_id -> fanout being used + + if (access(filename.c_str(), F_OK) != 0) { + printf("file %s not found, generating data\n", filename.c_str()); + + std::ofstream file(filename); + if (!file.is_open()) { + printf("failed to open file %s\n", filename.c_str()); + return; + } + + for (int i = 0; i < MAX_DATA; i++) { + int src_id = zipf(alpha, n); + int fanout = fanout_map.find(src_id) == fanout_map.end() ? 0 : fanout_map[src_id]; + fanout_map[src_id] = fanout + 1; + + file << "s_" << src_id << " d_" << fanout << std::endl; + } + + file.close(); + printf("data generated and saved to file %s\n", filename.c_str()); + } + + // load data + std::ifstream file(filename); + if (!file.is_open()) { + printf("failed to open file %s\n", filename.c_str()); + return; + } + + loadeds = new std::pair<std::string, std::string>[MAX_DATA]; + std::string line; + int i = 0; + while (std::getline(file, line) && i < MAX_DATA) { + std::istringstream iss(line); + std::string src_ip, dst_ip; + iss >> src_ip >> dst_ip; + loadeds[i] = std::make_pair(src_ip, dst_ip); + i++; + } + file.close(); +} + +SpreadSketchZipfGenerator::~SpreadSketchZipfGenerator() { + delete[] loadeds; +} + +struct Flow SpreadSketchZipfGenerator::next() { + int r_cursor = cursor % MAX_DATA; + struct Flow flow; + flow.src_ip = loadeds[r_cursor].first; + flow.dst_ip = loadeds[r_cursor].second; + + cursor++; + + return flow; +}
\ No newline at end of file diff --git a/test/utils.hpp b/test/utils.hpp index 28dea2b..f758f1d 100644 --- a/test/utils.hpp +++ b/test/utils.hpp @@ -44,4 +44,25 @@ private: double test_cal_topk_accuracy(std::vector<struct Fieldstat_tag_list_wrapper *> &test_result, std::unordered_map<std::string, int> &expected_count); // after we change fieldstat_counter_get return a error code in, all the tests should change correspondingly, so just use a adapter aliasing the old function -long long my_fieldstat_counter_get(const struct fieldstat *instance, int cube_id, int metric_id, int cell_id);
\ No newline at end of file +long long my_fieldstat_counter_get(const struct fieldstat *instance, int cube_id, int metric_id, int cell_id); + + +struct Flow { + std::string src_ip; + std::string dst_ip; +}; + +class SpreadSketchZipfGenerator { +private: + const int MAX_DATA = 1000000; + std::pair<std::string, std::string> *loadeds; + unsigned cursor; + +public: + SpreadSketchZipfGenerator(double alpha, int n); + struct Flow next(); + ~SpreadSketchZipfGenerator(); + + double _alpha; + int _n; +};
\ No newline at end of file |
