summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchenzizhan <[email protected]>2024-07-10 11:02:36 +0800
committerchenzizhan <[email protected]>2024-07-10 11:02:36 +0800
commite1fd771fc7e33ffd659535e81412179e8ac6929a (patch)
tree33faa31e3282d84c9a07d181a93676bf5acac21f
parentb711b50d356ffc09569d4f11ba2a0cae41045510 (diff)
spread sketch wipspreadsketch
-rw-r--r--include/fieldstat/fieldstat.h1
-rw-r--r--src/cube.c46
-rw-r--r--src/tags/spread_sketch.c127
-rw-r--r--test/utils.cpp147
-rw-r--r--test/utils.hpp23
5 files changed, 285 insertions, 59 deletions
diff --git a/include/fieldstat/fieldstat.h b/include/fieldstat/fieldstat.h
index 941b89c..b693d98 100644
--- a/include/fieldstat/fieldstat.h
+++ b/include/fieldstat/fieldstat.h
@@ -34,6 +34,7 @@ enum fs_tag_type
enum sampling_mode {
SAMPLING_MODE_COMPREHENSIVE,
SAMPLING_MODE_TOPK,
+ SAMPLING_MODE_SPREADSKETCH,
};
struct fieldstat_tag {
diff --git a/src/cube.c b/src/cube.c
index 21f3ef8..ff4b9ca 100644
--- a/src/cube.c
+++ b/src/cube.c
@@ -13,6 +13,7 @@
#include "metric.h"
#include "heavy_keeper.h"
#include "tag_map.h"
+#include "spread_sketch.h"
#define DEFAULT_N_METRIC 32
#define DEFAULT_N_CUBE 64
@@ -42,6 +43,7 @@ struct cube {
union {
struct heavy_keeper *topk;
struct hash_table *comprehensive;
+ struct spread_sketch *spread_sketch;
};
size_t max_n_cell;
@@ -503,6 +505,9 @@ struct cube *cube_new(const struct fieldstat_tag *shared_tags, size_t n_tag, enu
cube->comprehensive = hash_table_new(max_n_cell);
hash_table_set_exdata_schema(cube->comprehensive, exdata_new_i, exdata_free_i, exdata_merge_i, exdata_reset_i, exdata_copy_i);
break;
+ case SAMPLING_MODE_SPREADSKETCH:
+ cube->spread_sketch = spread_sketch_new(max_n_cell);
+ spread_sketch_set_exdata_schema(cube->spread_sketch, exdata_new_i, exdata_free_i, exdata_merge_i, exdata_reset_i, exdata_copy_i);
default:
assert(0);
break;
@@ -520,6 +525,9 @@ void cube_free(struct cube *cube) {
case SAMPLING_MODE_COMPREHENSIVE:
hash_table_free(cube->comprehensive);
break;
+ case SAMPLING_MODE_SPREADSKETCH:
+ spread_sketch_free(cube->spread_sketch);
+ break;
default:
assert(0);
break;
@@ -532,10 +540,25 @@ void cube_free(struct cube *cube) {
}
void cube_reset(struct cube *cube) {
- if (cube->sampling_mode == SAMPLING_MODE_TOPK) {
+ // if (cube->sampling_mode == SAMPLING_MODE_TOPK) {
+ // heavy_keeper_reset(cube->topk);
+ // } else {
+ // hash_table_reset(cube->comprehensive);
+ // }
+ switch (cube->sampling_mode)
+ {
+ case SAMPLING_MODE_TOPK:
heavy_keeper_reset(cube->topk);
- } else {
+ break;
+ case SAMPLING_MODE_COMPREHENSIVE:
hash_table_reset(cube->comprehensive);
+ break;
+ case SAMPLING_MODE_SPREADSKETCH:
+ spread_sketch_reset(cube->spread_sketch);
+ break;
+ default:
+ assert(0);
+ break;
}
}
@@ -580,7 +603,24 @@ struct cell *get_cell(struct cube *cube, const struct fieldstat_tag *tags, size_
}
}
break;}
- }
+ case SAMPLING_MODE_SPREADSKETCH: {
+ if (cube->primary_metric_id != metric_id) {
+ cell_data = spread_sketch_get0_exdata(cube->spread_sketch, tag_in_string, tag_len);
+ // todo: spread sketch 没办法支持dummy 场景。首先,我不能让所有metric 都走spread sketch流程,
+ // 因为正常来说,用level=0 的hashy 做数值,没有任何意义,肯定都更新不了,只是在刚开始的时候,起一个记录key 的作用。
+ // 而,如果像是topk 那样,给count=0 的一席之地,那么存在问题:spread sketch本身不对记录的个数有限制,所以什么时候停止记录呢?这样的设计也太麻烦了。
+ // 之前跟老板讨论的时候,给了两个方案,方案1:做一个buffer,如果get exdata0 get 不到,则往buffer 中的cell 里写,等到来了primary以后,把cell 送进去。
+ // 方案2:简单略去第一轮添加时的情况。这会造成很少量的误差。不过,实际上这个操作不是逐包而是会话开始结束时来一次,所以误差也不会太小。必须是让贺岚风能第一个操作primary metric。
+ }
+ if (cell_data == NULL) {
+ int tmp_ret = spread_sketch_add(cube->spread_sketch, tag_in_string, tag_len, 0, (void *)&args);
+ if (tmp_ret == 1) {
+ cell_data = spread_sketch_get0_exdata(cube->spread_sketch, tag_in_string, tag_len);
+ }
+ }
+ break;}
+ }
+
free(tag_in_string);
return cell_data;
diff --git a/src/tags/spread_sketch.c b/src/tags/spread_sketch.c
index 1a44d6a..c654e89 100644
--- a/src/tags/spread_sketch.c
+++ b/src/tags/spread_sketch.c
@@ -37,8 +37,18 @@ struct smart_ptr { // todo:entry
UT_hash_handle hh;
};
-struct smart_ptr_table {
- struct smart_ptr *table;
+struct spread_sketch_scheme {
+ exdata_new_cb new_fn;
+ exdata_free_cb free_fn;
+ exdata_merge_cb merge_fn;
+ exdata_reset_cb reset_fn;
+ exdata_copy_cb copy_fn;
+};
+
+struct smart_ptr_table { // TODO: entry table
+ struct smart_ptr *entry;
+
+ struct spread_sketch_scheme scheme;
};
struct bucket {
@@ -50,14 +60,13 @@ struct spread_sketch {
int depth;
int width;
- exdata_new_cb new_fn;
- exdata_free_cb free_fn;
- exdata_merge_cb merge_fn;
- exdata_reset_cb reset_fn;
- exdata_copy_cb copy_fn;
-
+ struct spread_sketch_scheme scheme;
+
struct bucket *buckets;
- struct group_table *groups;
+ struct smart_ptr_table *table;
+
+ uint32_t *min_level_per_row; // TODO: 先看看性能吧, 之后再写。用来记录每行最小的level,从而跳过行数。对于64位的level,维持一个计数,额外使用64 r的空间,当一个最小位数的level 计数到0时,更新最小level。
+ // TODO: 对比heavy keeper,不仅仅是跳过的问题,heavykeeper 无论什么情况,在输入0的时候都不会走sketch 更新。
};
static void *default_new_fn(void *arg) {
@@ -87,9 +96,9 @@ static inline char *key_dup(const char *key, size_t key_len) {
return ret;
}
-const struct smart_ptr *smart_prt_table_get(struct smart_ptr_table *table, const char *key, size_t key_len, void *arg) {
+struct smart_ptr *smart_ptr_table_get(struct smart_ptr_table *table, const char *key, size_t key_len, void *arg) {
struct smart_ptr *ret;
- HASH_FIND(hh, table, key, key_len, ret);
+ HASH_FIND(hh, table->entry, key, key_len, ret);
if (ret != NULL) {
ret->ref_count++;
@@ -101,25 +110,25 @@ const struct smart_ptr *smart_prt_table_get(struct smart_ptr_table *table, const
if (arg == NULL) {
ret->exdata = NULL;
} else {
- ret->exdata = table->new_fn(arg);
+ ret->exdata = table->scheme.new_fn(arg);
}
- HASH_ADD_KEYPTR(hh, table->table, ret->key, ret->key_len, ret);
+ HASH_ADD_KEYPTR(hh, table->entry, ret->key, ret->key_len, ret);
}
return ret;
}
-int smart_prt_table_release(struct smart_ptr_table *table, const char *key, size_t key_len) {
+int smart_ptr_table_release(struct smart_ptr_table *table, const char *key, size_t key_len) {
struct smart_ptr *ret;
- HASH_FIND(hh, table->table, key, key_len, ret);
+ HASH_FIND(hh, table->entry, key, key_len, ret);
if (ret == NULL) {
return -1;
}
ret->ref_count--;
if (ret->ref_count == 0) {
- HASH_DEL(table->table, ret);
- table->free_fn(ret->exdata);
+ HASH_DEL(table->entry, ret);
+ table->scheme.free_fn(ret->exdata);
free(ret->key);
free(ret);
}
@@ -127,26 +136,27 @@ int smart_prt_table_release(struct smart_ptr_table *table, const char *key, size
return 0;
}
-void smart_prt_table_free(struct smart_ptr_table *table) {
+void smart_ptr_table_free(struct smart_ptr_table *table) {
struct smart_ptr *current, *tmp;
- HASH_ITER(hh, table->table, current, tmp) {
- HASH_DEL(table->table, current);
- table->free_fn(current->exdata);
+ HASH_ITER(hh, table->entry, current, tmp) {
+ HASH_DEL(table->entry, current);
+ table->scheme.free_fn(current->exdata);
free(current->key);
free(current);
}
free(table);
}
-struct smart_ptr_table *smart_prt_table_new() {
+struct smart_ptr_table *smart_ptr_table_new() {
struct smart_ptr_table *table = malloc(sizeof(struct smart_ptr_table));
- table->table = NULL;
+ table->entry = NULL;
+
+ table->scheme.new_fn = default_new_fn;
+ table->scheme.free_fn = default_free_fn;
+ table->scheme.merge_fn = default_merge_fn;
+ table->scheme.reset_fn = default_reset_fn;
+ table->scheme.copy_fn = default_copy_fn;
- table->new_fn = default_new_fn;
- table->free_fn = default_free_fn;
- table->merge_fn = default_merge_fn;
- table->reset_fn = default_reset_fn;
- table->copy_fn = default_copy_fn;
return table;
}
@@ -172,7 +182,13 @@ struct spread_sketch *spread_sketch_new(int expected_query_num) {
get_parameter_recommendation(expected_query_num, &pthis->depth, &pthis->width);
pthis->buckets = calloc(pthis->depth * pthis->width, sizeof(struct bucket));
- pthis->table = smart_prt_table_new();
+ pthis->scheme.new_fn = default_new_fn;
+ pthis->scheme.free_fn = default_free_fn;
+ pthis->scheme.merge_fn = default_merge_fn;
+ pthis->scheme.reset_fn = default_reset_fn;
+ pthis->scheme.copy_fn = default_copy_fn;
+ pthis->table = smart_ptr_table_new();
+ pthis->table->scheme = pthis->scheme;
return pthis;
}
@@ -204,9 +220,9 @@ int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_leng
uint32_t true_level = bucket->content == NULL ? 0: bucket->level;
if (true_level < level) {
- struct smart_ptr *content_old = bucket->content;
- smart_prt_table_release(ss->table, content_old->key, content_old->key_len, ss->free_fn);
- struct smart_ptr *content_new = smart_prt_table_get(ss->table, key, key_length, arg);
+ const struct smart_ptr *content_old = bucket->content;
+ smart_ptr_table_release(ss->table, content_old->key, content_old->key_len);
+ struct smart_ptr *content_new = smart_ptr_table_get(ss->table, key, key_length, arg);
bucket->content = content_new;
bucket->level = level;
@@ -219,7 +235,7 @@ int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_leng
}
void spread_sketch_free(struct spread_sketch *ss) {
- smart_prt_table_free(ss->table, ss->free_fn);
+ smart_ptr_table_free(ss->table);
free(ss->buckets);
free(ss);
}
@@ -236,7 +252,7 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch *
continue;
}
if (bucket_dst->content == NULL) {
- bucket_dst->content = smart_prt_table_get(dst->table, bucket_src->content->key, bucket_src->content->key_len, NULL);
+ bucket_dst->content = smart_ptr_table_get(dst->table, bucket_src->content->key, bucket_src->content->key_len, NULL);
bucket_dst->level = bucket_src->level;
continue;
}
@@ -247,31 +263,33 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch *
}
} else {
if (bucket_src->level > bucket_dst->level) {
- smart_prt_table_release(dst->table, bucket_dst->content->key, bucket_dst->content->key_len);
- bucket_dst->content = smart_prt_table_get(dst->table, bucket_src->content->key, bucket_src->content->key_len, NULL);
+ smart_ptr_table_release(dst->table, bucket_dst->content->key, bucket_dst->content->key_len);
+ bucket_dst->content = smart_ptr_table_get(dst->table, bucket_src->content->key, bucket_src->content->key_len, NULL);
bucket_dst->level = bucket_src->level;
}
}
}
+ const struct spread_sketch_scheme *scheme = &dst->table->scheme;
+
struct smart_ptr *content_dest, *content_src, *tmp;
- HASH_ITER(hh, dst->table->table, content_dest, tmp) {
- HASH_FIND(hh, src->table->table, content_dest->key, content_dest->key_len, content_src);
+ HASH_ITER(hh, dst->table->entry, content_dest, tmp) {
+ HASH_FIND(hh, src->table->entry, content_dest->key, content_dest->key_len, content_src);
if (content_src == NULL) {
continue;
}
if (content_dest->exdata == NULL) {
- content_dest->exdata = dst->table->copy_fn(content_src->exdata);
+ content_dest->exdata = scheme->copy_fn(content_src->exdata);
} else {
- dst->table->merge_fn(content_dest->exdata, content_src->exdata);
+ scheme->merge_fn(content_dest->exdata, content_src->exdata);
}
}
}
void *spread_sketch_get0_exdata(const struct spread_sketch *ss, const char *key, size_t key_len) {
struct smart_ptr *content;
- HASH_FIND(hh, ss->table->table, key, key_len, content);
+ HASH_FIND(hh, ss->table->entry, key, key_len, content);
if (content == NULL) {
return NULL;
}
@@ -285,22 +303,23 @@ void spread_sketch_reset(struct spread_sketch *ss) {
}
struct smart_ptr *content, *tmp;
- HASH_ITER(hh, ss->table->table, content, tmp) {
- ss->reset_fn(content->exdata);
+ HASH_ITER(hh, ss->table->entry, content, tmp) {
+ ss->scheme.reset_fn(content->exdata);
}
}
void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn) {
- ss->table->new_fn = new_fn;
- ss->table->free_fn = free_fn;
- ss->table->merge_fn = merge_fn;
- ss->table->reset_fn = reset_fn;
- ss->table->copy_fn = copy_fn;
- return 0;
+ ss->scheme.new_fn = new_fn;
+ ss->scheme.free_fn = free_fn;
+ ss->scheme.merge_fn = merge_fn;
+ ss->scheme.reset_fn = reset_fn;
+ ss->scheme.copy_fn = copy_fn;
+
+ ss->table->scheme = ss->scheme;
}
int spread_sketch_get_count(const struct spread_sketch *ss) {
- return HASH_CNT(hh, ss->table->table);
+ return HASH_CNT(hh, ss->table->entry);
}
// 这个函数还是会忠实的把内部所有内容都拿出来,但是预期是最多expected_query_num 个,所以在外层要做一个排序。
@@ -308,7 +327,7 @@ int spread_sketch_get_count(const struct spread_sketch *ss) {
size_t spread_sketch_list(const struct spread_sketch *ss, void **exdatas, size_t n_exdatas) {
size_t count = 0;
struct smart_ptr *content, *tmp;
- HASH_ITER(hh, ss->table->table, content, tmp) {
+ HASH_ITER(hh, ss->table->entry, content, tmp) {
if (count >= n_exdatas) {
break;
}
@@ -323,16 +342,16 @@ struct spread_sketch *spread_sketch_copy(const struct spread_sketch *src) {
memcpy(dst, src, sizeof(struct spread_sketch));
dst->buckets = calloc(dst->depth * dst->width, sizeof(struct bucket));
- dst->table = smart_prt_table_new();
+ dst->table = smart_ptr_table_new();
for (int i = 0; i < dst->depth * dst->width; i++) {
if (src->buckets[i].content == NULL) {
continue;
}
- dst->buckets[i].content = smart_prt_table_get(dst->table, src->buckets[i].content->key, src->buckets[i].content->key_len, NULL);
+ dst->buckets[i].content = smart_ptr_table_get(dst->table, src->buckets[i].content->key, src->buckets[i].content->key_len, NULL);
dst->buckets[i].level = src->buckets[i].level;
if (dst->buckets[i].content->exdata == NULL) {
- dst->buckets[i].content->exdata = src->copy_fn(src->buckets[i].content->exdata);
+ dst->buckets[i].content->exdata = src->scheme.copy_fn(src->buckets[i].content->exdata);
}
}
return dst;
diff --git a/test/utils.cpp b/test/utils.cpp
index b53e4e4..4015dd4 100644
--- a/test/utils.cpp
+++ b/test/utils.cpp
@@ -8,7 +8,10 @@
#include <random>
#include <string.h>
#include <algorithm>
-
+#include <fstream>
+#include <math.h>
+#include <unistd.h>
+#include <sstream>
#include "fieldstat.h"
#include "utils.hpp"
@@ -293,3 +296,145 @@ double test_cal_topk_accuracy(vector<struct Fieldstat_tag_list_wrapper *> &test_
double accuracy = (double)correct / test_result.size();
return accuracy;
}
+
+
+//===========================================================================
+//= Function to generate Zipf (power law) distributed random variables =
+//= - Input: alpha and N =
+//= - Output: Returns with Zipf distributed random variable =
+//===========================================================================
+int zipf(double alpha, int n)
+{
+ static bool first = true; // Static first time flag
+ static double c = 0; // Normalization constant
+ double z; // Uniform random number (0 < z < 1)
+ double sum_prob; // Sum of probabilities
+ double zipf_value; // Computed exponential value to be returned
+ int i; // Loop counter
+
+ // Compute normalization constant on first call only
+ if (first)
+ {
+ for (i=1; i<=n; i++)
+ c = c + (1.0 / pow((double) i, alpha));
+ c = 1.0 / c;
+ first = false;
+ }
+
+ // Pull a uniform random number (0 < z < 1)
+ do
+ {
+ z = (double)rand() / (double)RAND_MAX;
+ }
+ while ((z == 0.0) || (z == 1.0));
+
+ // Map z to the value
+ sum_prob = 0;
+ for (i=1; i<=n; i++)
+ {
+ sum_prob = sum_prob + c / pow((double) i, alpha);
+ if (sum_prob >= z)
+ {
+ zipf_value = i;
+ break;
+ }
+ }
+
+ return(zipf_value);
+}
+
+
+// class SpreadSketchZipfGenerator {
+// private:
+// const int MAX_DATA = 1000000;
+// std::pair<std::string, std::string> *loadeds;
+// unsigned cursor;
+
+// public:
+// SpreadSketchZipfGenerator(double alpha, int n) {
+
+// }
+
+// struct Flow next() {
+// int r_cursor = cursor % MAX_DATA;
+// struct Flow flow;
+// flow.src_ip = loadeds[r_cursor].first;
+// flow.dst_ip = loadeds[r_cursor].second;
+
+// cursor++;
+
+// return flow;
+// }
+
+// ~SpreadSketchZipfGenerator() {
+// delete[] loadeds;
+// }
+
+// double _alpha;
+// int _n;
+// };
+
+SpreadSketchZipfGenerator::SpreadSketchZipfGenerator(double alpha, int n) {
+ _alpha = alpha;
+ _n = n;
+
+ // generate data and write them to file
+ std::string filename = "zipf_" + std::to_string(alpha) + "_" + std::to_string(n) + ".txt";
+
+ std::unordered_map<int, int> fanout_map; // src_ip_id -> fanout being used
+
+ if (access(filename.c_str(), F_OK) != 0) {
+ printf("file %s not found, generating data\n", filename.c_str());
+
+ std::ofstream file(filename);
+ if (!file.is_open()) {
+ printf("failed to open file %s\n", filename.c_str());
+ return;
+ }
+
+ for (int i = 0; i < MAX_DATA; i++) {
+ int src_id = zipf(alpha, n);
+ int fanout = fanout_map.find(src_id) == fanout_map.end() ? 0 : fanout_map[src_id];
+ fanout_map[src_id] = fanout + 1;
+
+ file << "s_" << src_id << " d_" << fanout << std::endl;
+ }
+
+ file.close();
+ printf("data generated and saved to file %s\n", filename.c_str());
+ }
+
+ // load data
+ std::ifstream file(filename);
+ if (!file.is_open()) {
+ printf("failed to open file %s\n", filename.c_str());
+ return;
+ }
+
+ loadeds = new std::pair<std::string, std::string>[MAX_DATA];
+ std::string line;
+ int i = 0;
+ while (std::getline(file, line) && i < MAX_DATA) {
+ std::istringstream iss(line);
+ std::string src_ip, dst_ip;
+ iss >> src_ip >> dst_ip;
+ loadeds[i] = std::make_pair(src_ip, dst_ip);
+ i++;
+ }
+ file.close();
+}
+
+SpreadSketchZipfGenerator::~SpreadSketchZipfGenerator() {
+ delete[] loadeds;
+}
+
+struct Flow SpreadSketchZipfGenerator::next() {
+ int r_cursor = cursor % MAX_DATA;
+ struct Flow flow;
+ flow.src_ip = loadeds[r_cursor].first;
+ flow.dst_ip = loadeds[r_cursor].second;
+
+ cursor++;
+
+ return flow;
+} \ No newline at end of file
diff --git a/test/utils.hpp b/test/utils.hpp
index 28dea2b..f758f1d 100644
--- a/test/utils.hpp
+++ b/test/utils.hpp
@@ -44,4 +44,25 @@ private:
double test_cal_topk_accuracy(std::vector<struct Fieldstat_tag_list_wrapper *> &test_result, std::unordered_map<std::string, int> &expected_count);
// after we change fieldstat_counter_get return a error code in, all the tests should change correspondingly, so just use a adapter aliasing the old function
-long long my_fieldstat_counter_get(const struct fieldstat *instance, int cube_id, int metric_id, int cell_id); \ No newline at end of file
+long long my_fieldstat_counter_get(const struct fieldstat *instance, int cube_id, int metric_id, int cell_id);
+
+
+struct Flow {
+ std::string src_ip;
+ std::string dst_ip;
+};
+
+class SpreadSketchZipfGenerator {
+private:
+ const int MAX_DATA = 1000000;
+ std::pair<std::string, std::string> *loadeds;
+ unsigned cursor;
+
+public:
+ SpreadSketchZipfGenerator(double alpha, int n);
+ struct Flow next();
+ ~SpreadSketchZipfGenerator();
+
+ double _alpha;
+ int _n;
+}; \ No newline at end of file