summaryrefslogtreecommitdiff
path: root/src/cells
diff options
context:
space:
mode:
authorchenzizhan <[email protected]>2024-07-17 10:35:24 +0800
committerchenzizhan <[email protected]>2024-07-17 10:35:24 +0800
commit6595cbbde1280b6c7d3c445697e39aa18fa9741f (patch)
tree0fe9af32b13926b6aa8682337b500012ddecafca /src/cells
parentc488da1f8346baf8d5a0260da9c5934c8dfdfbef (diff)
primary metric in spreadsketch/heavykeeper
Diffstat (limited to 'src/cells')
-rw-r--r--src/cells/hll_common.c35
-rw-r--r--src/cells/hll_common.h1
-rw-r--r--src/cells/spread_sketch.c100
-rw-r--r--src/cells/spread_sketch.h25
4 files changed, 118 insertions, 43 deletions
diff --git a/src/cells/hll_common.c b/src/cells/hll_common.c
index 7017242..47dc113 100644
--- a/src/cells/hll_common.c
+++ b/src/cells/hll_common.c
@@ -2,6 +2,8 @@
#include "hll_common.h"
#include "crdt_utils.h"
+#include "base64/b64.h"
+
#include <string.h>
#include <math.h>
#include <assert.h>
@@ -298,7 +300,7 @@ void hll_advance_reset_index(long long *reset_idx, long long reset_reg_count, un
long long hll_get_reset_register_count(unsigned char precision, long long time_window_ms, struct timeval now, struct timeval *reset_time)
{
long long reset_time_slot_us=RESET_TIME_SLOT_US(time_window_ms, precision);
- long long delta_us=timeval_delta_us(*reset_time, now);
+ long long delta_us=timeval_delta_us((*reset_time), now);
if(delta_us < reset_time_slot_us)
return 0;
@@ -349,3 +351,34 @@ int hll_should_slide(unsigned char precision, long long time_window_ms, struct t
return 1;
return 0;
}
+
+void hll_encode_into_base64(const uint32_t *registers, unsigned char precision, char **blob, size_t *blob_sz)
+{
+ size_t sz=0, offset=0;
+ size_t num_reg = NUM_REG(precision);
+ size_t words = INT_CEIL(num_reg, REG_PER_WORD);
+
+ sz = hll_size(precision) + 2; // [version][precision][data..., + 2 for version and precision
+ unsigned char *buffer = ALLOC(unsigned char, sz);
+
+ const unsigned char version = 1;
+ memcpy(buffer+offset, &version, 1);
+ offset += 1;
+
+ memcpy(buffer+offset, &precision, 1);
+ offset += 1;
+
+ for (int i = 0; i < words; i++) {
+ uint32_t word = registers[i];
+ buffer[offset++] = word >> 24;
+ buffer[offset++] = word >> 16;
+ buffer[offset++] = word >> 8;
+ buffer[offset++] = word;
+ }
+
+ char *enc = b64_encode(buffer, sz);
+ free(buffer);
+ *blob_sz = strlen(enc);
+ *blob = enc;
+ return;
+} \ No newline at end of file
diff --git a/src/cells/hll_common.h b/src/cells/hll_common.h
index a110153..b62530a 100644
--- a/src/cells/hll_common.h
+++ b/src/cells/hll_common.h
@@ -21,6 +21,7 @@ void hll_reset_registers(uint32_t *registers, unsigned char precision, long long
long long hll_get_reset_register_count(unsigned char precision, long long time_window_ms, struct timeval now, struct timeval *reset_time);
void hll_advance_reset_index(long long *reset_idx, long long reset_reg_count, unsigned char precision);
void hll_merge(uint32_t *dst_registers, const uint32_t *src_registers, unsigned char precision);
+void hll_encode_into_base64(const uint32_t *registers, unsigned char precision, char **blob, size_t *blob_sz);
#ifdef __cplusplus
}
diff --git a/src/cells/spread_sketch.c b/src/cells/spread_sketch.c
index 96436e3..fd1c06d 100644
--- a/src/cells/spread_sketch.c
+++ b/src/cells/spread_sketch.c
@@ -109,7 +109,6 @@ static void *default_copy_fn(void *exdata) {
return exdata;
}
-
uint32_t cal_true_level(const struct spread_sketch *ss, int bucket_idx, long long t) {
/*
return f(t), the actual level of bucket, which satisfy:
@@ -117,7 +116,11 @@ uint32_t cal_true_level(const struct spread_sketch *ss, int bucket_idx, long lon
2. f(t0 + T) = 1
3. f((t0) = L )
*/
- struct bucket *bucket = &ss->buckets[bucket_idx];
+ struct bucket *bucket = &ss->buckets[bucket_idx];
+ if (ss->time_window_ms == 0) {
+ return bucket->level;
+ }
+
unsigned L = bucket->level;
long long T = ss->time_window_ms;
long long t0 = bucket->last_update_ms;
@@ -248,7 +251,11 @@ struct spread_sketch *spread_sketch_new(int depth, int width, unsigned char prec
return pthis;
}
-void move_registers_forward(struct spread_sketch *ss, struct timeval *now) {
+void move_registers_forward(struct spread_sketch *ss, const struct timeval *now) {
+ if (ss->time_window_ms == 0) {
+ return;
+ }
+
long long reset_reg_count = hll_get_reset_register_count(ss->precision, ss->time_window_ms, *now, &ss->reset_time);
if (reset_reg_count > 0) {
for (int i = 0; i < ss->depth * ss->width; i++) {
@@ -267,12 +274,14 @@ void min_level_state_update(struct spread_sketch *ss, uint32_t level_old, uint32
}
// return 0 if not added, return 1 if added
-int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg, , struct timeval now) {
+int spread_sketch_add_hash(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg, struct timeval now) {
uint32_t level = (uint32_t)__builtin_clzll(item_hash) + 1;
long long now_ms = now.tv_sec * 1000 + now.tv_usec / 1000;
if (level <= ss->level_min) {
- if (item_hash == DUMMY_ITEM_HASH) {
+ if (item_hash == DUMMY_ITEM_HASH) { // TODO: 这个地方的逻辑对不上,其实返回1应该是“在sketch中”就返回1,为了get exdata 的问题,增加了一种特殊情况。
+ // 正常来说,应该是在任何时候,都调用add_sketch,不区分primary or not,并走同样的逻辑,在不包含于sketch时不get。
+ // todo: 不对,如果hll 在内部,那么hll 的更新总是要完成,这里只是区别了要更新hll 和不更新hll 的情况
return 0; // DUMMY_ITEM_HASH is the case of "recording a key" instead of update hll. Just return 0 to inform the key is not added to the sketch
}
@@ -311,7 +320,7 @@ int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_leng
uint32_t true_level = bucket->content == NULL ? 0: cal_true_level(ss, bucket_idx, now_ms);
if (level > true_level) {
- min_level_state_update(ss, old_level, level);
+ min_level_state_update(ss, true_level, level);
// printf("update key %s to %s, and level %u to %u, in bucket (r,w)=(%d,%u)\n", bucket->content == NULL ? "NULL": bucket->content->key, key, true_level, level, i, hash_x % ss->width);
const struct entry *content_old = bucket->content;
if (content_old != NULL) {
@@ -332,6 +341,11 @@ int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_leng
return in_sketch ? 1 : 0;
}
+int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, const char* item, size_t item_len, void *arg, struct timeval now) {
+ uint64_t hash = XXH3_64bits(item, item_len);
+ return spread_sketch_add_hash(ss, key, key_length, hash, arg, now);
+}
+
double spread_sketch_point_query(const struct spread_sketch *ss, const char *key, size_t key_length) {
uint64_t hash_x_tmp = XXH3_64bits_withSeed(key, key_length, 171);
uint32_t hash_x1 = (uint32_t) (hash_x_tmp >> 32);
@@ -350,7 +364,7 @@ double spread_sketch_point_query(const struct spread_sketch *ss, const char *key
return count_min;
}
-struct spread_sketch *duplicate_and_step(const struct spread_sketch *ss, struct timeval *now) {
+struct spread_sketch *duplicate_and_step(const struct spread_sketch *ss, const struct timeval *now) {
struct spread_sketch *duplicate = malloc(sizeof(struct spread_sketch));
duplicate->depth = ss->depth;
duplicate->width = ss->width;
@@ -367,17 +381,6 @@ struct spread_sketch *duplicate_and_step(const struct spread_sketch *ss, struct
return duplicate;
}
-double spread_sketch_query(const struct spread_sketch *ss, const char *key, size_t key_length, struct timeval now) {
- if (hll_should_slide(ss->precision, ss->time_window_ms, now, ss->reset_time)) {
- struct spread_sketch *duplicate = duplicate_and_step(ss, &now);
- double ret = spread_sketch_point_query(duplicate, key, key_length);
- spread_sketch_free(duplicate);
- return ret;
- } else {
- return spread_sketch_point_query(ss, key, key_length);
- }
-}
-
void spread_sketch_free(struct spread_sketch *ss) {
smart_ptr_table_free(ss->table);
for (int i = 0; i < ss->depth * ss->width; i++) {
@@ -401,12 +404,13 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch *
continue;
}
+ hll_merge(bucket_dst->sthll_register, src->buckets[i].sthll_register, dst->precision);
+
if (bucket_dst->content == NULL) {
bucket_dst->content = smart_ptr_table_get(dst->table, bucket_src->content->key, bucket_src->content->key_len, NULL);
bucket_dst->level = bucket_src->level;
bucket_dst->last_update_ms = bucket_src->last_update_ms;
- bucket_dst->sthll_register = hll_duplicate(bucket_src->sthll_register, dst->precision);
min_level_state_update(dst, 0, bucket_src->level);
continue;
}
@@ -431,8 +435,6 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch *
bucket_dst->last_update_ms = bucket_src->last_update_ms;
}
}
-
- hll_merge(bucket_dst->sthll_register, src->buckets[i].sthll_register, dst->precision);
}
// for exdata
@@ -518,8 +520,53 @@ size_t spread_sketch_list(const struct spread_sketch *ss, void **exdatas, size_t
return count;
}
-double spread_sketch_point_query
+void spread_sketch_list_keys(const struct spread_sketch *ss, char ***keys, size_t **key_lens, size_t *n_keys) {
+ size_t count_max = HASH_COUNT(ss->table->entry);
+ size_t count = 0;
+
+ *keys = malloc(count_max * sizeof(char *));
+ *key_lens = malloc(count_max * sizeof(size_t));
+
+ struct entry *content, *tmp;
+ HASH_ITER(hh, ss->table->entry, content, tmp) {
+ if (content->dying) {
+ continue;
+ }
+
+ (*keys)[count] = content->key;
+ (*key_lens)[count] = content->key_len;
+ count++;
+ }
+}
+
+double spread_sketch_get_cardinality(const struct spread_sketch *ss, const char *key, size_t key_len) {
+ double est = spread_sketch_point_query(ss, key, key_len);
+ return est;
+}
+char *spread_sketch_get_hll_base64_serialization(const struct spread_sketch *ss, const char *key, size_t key_len) {
+ uint64_t hash_x_tmp = XXH3_64bits_withSeed(key, key_len, 171);
+ uint32_t hash_x1 = (uint32_t) (hash_x_tmp >> 32);
+ uint32_t hash_x2 = (uint32_t) hash_x_tmp;
+ uint32_t *register_ret = NULL;
+ double min_cardinality = (double)INT32_MAX;
+
+ for (int i = 0; i < ss->depth; i++) {
+ uint32_t hash_x = hash_x1 + i * hash_x2;
+ int bucket_idx = (hash_x % ss->width) + i * ss->width;
+
+ double est = hll_count(ss->buckets[bucket_idx].sthll_register, ss->precision, ss->reset_idx, ss->time_window_ms);
+ if (est < min_cardinality) {
+ min_cardinality = est;
+ register_ret = ss->buckets[bucket_idx].sthll_register;
+ }
+ }
+
+ char *enc;
+ size_t enc_len;
+ hll_encode_into_base64(register_ret, ss->precision, &enc, &enc_len);
+ return enc;
+}
struct spread_sketch *spread_sketch_copy(const struct spread_sketch *src) {
struct spread_sketch *dst = malloc(sizeof(struct spread_sketch));
@@ -549,10 +596,10 @@ void spread_sketch_get_parameter_recommendation(int expected_super_spreader_numb
*depth_out = logk;
int w;
- if (max_super_spreader_number <= 100) {
- w = max_super_spreader_number * 3 /2; // * 1.5, when the number is small, we need more width
+ if (expected_super_spreader_number <= 100) {
+ w = expected_super_spreader_number * 3 /2; // * 1.5, when the number is small, we need more width
} else {
- w = max_super_spreader_number + 50; // + 50: 100*1.5-100 = 50, make w=f(k) continuous
+ w = expected_super_spreader_number + 50; // + 50: 100*1.5-100 = 50, make w=f(k) continuous
}
if (w < 40) {
w = 40;
@@ -578,9 +625,6 @@ void spread_sketch_change_precision(struct spread_sketch *ss, unsigned char prec
ss->precision = precision;
}
-void hyperloglog_serialize_for_networking(const struct hyperloglog *h, char **blob, size_t *blob_sz) {
- hyperloglog_serialize_for_networking(data->hll, blob, blob_size);
-}
// todo: serialize(swarm kv)
// todo: swarm kv 上实现reset 和copy。 \ No newline at end of file
diff --git a/src/cells/spread_sketch.h b/src/cells/spread_sketch.h
index baec128..b1c7a34 100644
--- a/src/cells/spread_sketch.h
+++ b/src/cells/spread_sketch.h
@@ -1,41 +1,38 @@
#pragma once
-#include <stddef.h>
+
#ifdef __cplusplus
extern "C"{
#endif
#include "exdata.h"
+#include <stdint.h> //uint64_t
+#include <sys/time.h> // struct timeval
+#include <stddef.h>
#define DUMMY_ITEM_HASH (1ULL<<63) // level(left most zeros) = 0
struct spread_sketch;
-
struct spread_sketch *spread_sketch_new(int depth, int width, unsigned char precision, int time_window_ms, struct timeval now);
-
void spread_sketch_free(struct spread_sketch *ss);
void spread_sketch_reset(struct spread_sketch *ss);
-int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg, struct timeval now);
-// void spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, const char* item, size_t item_len, void *arg);
-// TODO: 增加add_hash 接口
-void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn);
+int spread_sketch_add_hash(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg, struct timeval now);
+int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, const char* item, size_t item_len, void *arg, struct timeval now);
-void *spread_sketch_get0_exdata(const struct spread_sketch *ss, const char *key, size_t key_len);
+void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn);
// get the number of cells in the heavy keeper
int spread_sketch_get_count(const struct spread_sketch *ss);
-size_t spread_sketch_list(const struct spread_sketch *ss, void **exdatas, size_t n_exdatas);
-// 一系列对于spread sketch的查询
-// size_t spread_sketch_list_keys(const struct spread_sketch *ss, const char *keys[], size_t n_keys);
-// const uint32_t spread_sketch_query_register(ss, const char *key, size_t key_len);
-
-// size_t spread_sketch_list(const struct spread_sketch *ss, void **exdatas, size_t n_exdatas);
+void spread_sketch_list_keys(const struct spread_sketch *ss, char ***keys, size_t **key_lens, size_t *n_keys);
+double spread_sketch_get_cardinality(const struct spread_sketch *ss, const char *key, size_t key_len);
+char *spread_sketch_get_hll_base64_serialization(const struct spread_sketch *ss, const char *key, size_t key_len);
+void *spread_sketch_get0_exdata(const struct spread_sketch *ss, const char *key, size_t key_len);
void spread_sketch_merge(struct spread_sketch *dest, const struct spread_sketch *src);