summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/cells/hll_common.c35
-rw-r--r--src/cells/hll_common.h1
-rw-r--r--src/cells/spread_sketch.c100
-rw-r--r--src/cells/spread_sketch.h25
-rw-r--r--src/cube.c134
-rw-r--r--src/cube.h2
-rw-r--r--src/exporter/cjson_exporter.c2
-rw-r--r--src/fieldstat.c4
-rw-r--r--src/metrics/hyperloglog.c33
-rw-r--r--src/metrics/hyperloglog.h2
-rw-r--r--src/metrics/metric.c2
11 files changed, 242 insertions, 98 deletions
diff --git a/src/cells/hll_common.c b/src/cells/hll_common.c
index 7017242..47dc113 100644
--- a/src/cells/hll_common.c
+++ b/src/cells/hll_common.c
@@ -2,6 +2,8 @@
#include "hll_common.h"
#include "crdt_utils.h"
+#include "base64/b64.h"
+
#include <string.h>
#include <math.h>
#include <assert.h>
@@ -298,7 +300,7 @@ void hll_advance_reset_index(long long *reset_idx, long long reset_reg_count, un
long long hll_get_reset_register_count(unsigned char precision, long long time_window_ms, struct timeval now, struct timeval *reset_time)
{
long long reset_time_slot_us=RESET_TIME_SLOT_US(time_window_ms, precision);
- long long delta_us=timeval_delta_us(*reset_time, now);
+ long long delta_us=timeval_delta_us((*reset_time), now);
if(delta_us < reset_time_slot_us)
return 0;
@@ -349,3 +351,34 @@ int hll_should_slide(unsigned char precision, long long time_window_ms, struct t
return 1;
return 0;
}
+
+void hll_encode_into_base64(const uint32_t *registers, unsigned char precision, char **blob, size_t *blob_sz)
+{
+ size_t sz=0, offset=0;
+ size_t num_reg = NUM_REG(precision);
+ size_t words = INT_CEIL(num_reg, REG_PER_WORD);
+
+ sz = hll_size(precision) + 2; // [version][precision][data..., + 2 for version and precision
+ unsigned char *buffer = ALLOC(unsigned char, sz);
+
+ const unsigned char version = 1;
+ memcpy(buffer+offset, &version, 1);
+ offset += 1;
+
+ memcpy(buffer+offset, &precision, 1);
+ offset += 1;
+
+ for (int i = 0; i < words; i++) {
+ uint32_t word = registers[i];
+ buffer[offset++] = word >> 24;
+ buffer[offset++] = word >> 16;
+ buffer[offset++] = word >> 8;
+ buffer[offset++] = word;
+ }
+
+ char *enc = b64_encode(buffer, sz);
+ free(buffer);
+ *blob_sz = strlen(enc);
+ *blob = enc;
+ return;
+} \ No newline at end of file
diff --git a/src/cells/hll_common.h b/src/cells/hll_common.h
index a110153..b62530a 100644
--- a/src/cells/hll_common.h
+++ b/src/cells/hll_common.h
@@ -21,6 +21,7 @@ void hll_reset_registers(uint32_t *registers, unsigned char precision, long long
long long hll_get_reset_register_count(unsigned char precision, long long time_window_ms, struct timeval now, struct timeval *reset_time);
void hll_advance_reset_index(long long *reset_idx, long long reset_reg_count, unsigned char precision);
void hll_merge(uint32_t *dst_registers, const uint32_t *src_registers, unsigned char precision);
+void hll_encode_into_base64(const uint32_t *registers, unsigned char precision, char **blob, size_t *blob_sz);
#ifdef __cplusplus
}
diff --git a/src/cells/spread_sketch.c b/src/cells/spread_sketch.c
index 96436e3..fd1c06d 100644
--- a/src/cells/spread_sketch.c
+++ b/src/cells/spread_sketch.c
@@ -109,7 +109,6 @@ static void *default_copy_fn(void *exdata) {
return exdata;
}
-
uint32_t cal_true_level(const struct spread_sketch *ss, int bucket_idx, long long t) {
/*
return f(t), the actual level of bucket, which satisfy:
@@ -117,7 +116,11 @@ uint32_t cal_true_level(const struct spread_sketch *ss, int bucket_idx, long lon
2. f(t0 + T) = 1
3. f((t0) = L )
*/
- struct bucket *bucket = &ss->buckets[bucket_idx];
+ struct bucket *bucket = &ss->buckets[bucket_idx];
+ if (ss->time_window_ms == 0) {
+ return bucket->level;
+ }
+
unsigned L = bucket->level;
long long T = ss->time_window_ms;
long long t0 = bucket->last_update_ms;
@@ -248,7 +251,11 @@ struct spread_sketch *spread_sketch_new(int depth, int width, unsigned char prec
return pthis;
}
-void move_registers_forward(struct spread_sketch *ss, struct timeval *now) {
+void move_registers_forward(struct spread_sketch *ss, const struct timeval *now) {
+ if (ss->time_window_ms == 0) {
+ return;
+ }
+
long long reset_reg_count = hll_get_reset_register_count(ss->precision, ss->time_window_ms, *now, &ss->reset_time);
if (reset_reg_count > 0) {
for (int i = 0; i < ss->depth * ss->width; i++) {
@@ -267,12 +274,14 @@ void min_level_state_update(struct spread_sketch *ss, uint32_t level_old, uint32
}
// return 0 if not added, return 1 if added
-int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg, , struct timeval now) {
+int spread_sketch_add_hash(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg, struct timeval now) {
uint32_t level = (uint32_t)__builtin_clzll(item_hash) + 1;
long long now_ms = now.tv_sec * 1000 + now.tv_usec / 1000;
if (level <= ss->level_min) {
- if (item_hash == DUMMY_ITEM_HASH) {
+ if (item_hash == DUMMY_ITEM_HASH) { // TODO: 这个地方的逻辑对不上,其实返回1应该是“在sketch中”就返回1,为了get exdata 的问题,增加了一种特殊情况。
+ // 正常来说,应该是在任何时候,都调用add_sketch,不区分primary or not,并走同样的逻辑,在不包含于sketch时不get。
+ // todo: 不对,如果hll 在内部,那么hll 的更新总是要完成,这里只是区别了要更新hll 和不更新hll 的情况
return 0; // DUMMY_ITEM_HASH is the case of "recording a key" instead of update hll. Just return 0 to inform the key is not added to the sketch
}
@@ -311,7 +320,7 @@ int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_leng
uint32_t true_level = bucket->content == NULL ? 0: cal_true_level(ss, bucket_idx, now_ms);
if (level > true_level) {
- min_level_state_update(ss, old_level, level);
+ min_level_state_update(ss, true_level, level);
// printf("update key %s to %s, and level %u to %u, in bucket (r,w)=(%d,%u)\n", bucket->content == NULL ? "NULL": bucket->content->key, key, true_level, level, i, hash_x % ss->width);
const struct entry *content_old = bucket->content;
if (content_old != NULL) {
@@ -332,6 +341,11 @@ int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_leng
return in_sketch ? 1 : 0;
}
+int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, const char* item, size_t item_len, void *arg, struct timeval now) {
+ uint64_t hash = XXH3_64bits(item, item_len);
+ return spread_sketch_add_hash(ss, key, key_length, hash, arg, now);
+}
+
double spread_sketch_point_query(const struct spread_sketch *ss, const char *key, size_t key_length) {
uint64_t hash_x_tmp = XXH3_64bits_withSeed(key, key_length, 171);
uint32_t hash_x1 = (uint32_t) (hash_x_tmp >> 32);
@@ -350,7 +364,7 @@ double spread_sketch_point_query(const struct spread_sketch *ss, const char *key
return count_min;
}
-struct spread_sketch *duplicate_and_step(const struct spread_sketch *ss, struct timeval *now) {
+struct spread_sketch *duplicate_and_step(const struct spread_sketch *ss, const struct timeval *now) {
struct spread_sketch *duplicate = malloc(sizeof(struct spread_sketch));
duplicate->depth = ss->depth;
duplicate->width = ss->width;
@@ -367,17 +381,6 @@ struct spread_sketch *duplicate_and_step(const struct spread_sketch *ss, struct
return duplicate;
}
-double spread_sketch_query(const struct spread_sketch *ss, const char *key, size_t key_length, struct timeval now) {
- if (hll_should_slide(ss->precision, ss->time_window_ms, now, ss->reset_time)) {
- struct spread_sketch *duplicate = duplicate_and_step(ss, &now);
- double ret = spread_sketch_point_query(duplicate, key, key_length);
- spread_sketch_free(duplicate);
- return ret;
- } else {
- return spread_sketch_point_query(ss, key, key_length);
- }
-}
-
void spread_sketch_free(struct spread_sketch *ss) {
smart_ptr_table_free(ss->table);
for (int i = 0; i < ss->depth * ss->width; i++) {
@@ -401,12 +404,13 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch *
continue;
}
+ hll_merge(bucket_dst->sthll_register, src->buckets[i].sthll_register, dst->precision);
+
if (bucket_dst->content == NULL) {
bucket_dst->content = smart_ptr_table_get(dst->table, bucket_src->content->key, bucket_src->content->key_len, NULL);
bucket_dst->level = bucket_src->level;
bucket_dst->last_update_ms = bucket_src->last_update_ms;
- bucket_dst->sthll_register = hll_duplicate(bucket_src->sthll_register, dst->precision);
min_level_state_update(dst, 0, bucket_src->level);
continue;
}
@@ -431,8 +435,6 @@ void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch *
bucket_dst->last_update_ms = bucket_src->last_update_ms;
}
}
-
- hll_merge(bucket_dst->sthll_register, src->buckets[i].sthll_register, dst->precision);
}
// for exdata
@@ -518,8 +520,53 @@ size_t spread_sketch_list(const struct spread_sketch *ss, void **exdatas, size_t
return count;
}
-double spread_sketch_point_query
+void spread_sketch_list_keys(const struct spread_sketch *ss, char ***keys, size_t **key_lens, size_t *n_keys) {
+ size_t count_max = HASH_COUNT(ss->table->entry);
+ size_t count = 0;
+
+ *keys = malloc(count_max * sizeof(char *));
+ *key_lens = malloc(count_max * sizeof(size_t));
+
+ struct entry *content, *tmp;
+ HASH_ITER(hh, ss->table->entry, content, tmp) {
+ if (content->dying) {
+ continue;
+ }
+
+ (*keys)[count] = content->key;
+ (*key_lens)[count] = content->key_len;
+ count++;
+ }
+}
+
+double spread_sketch_get_cardinality(const struct spread_sketch *ss, const char *key, size_t key_len) {
+ double est = spread_sketch_point_query(ss, key, key_len);
+ return est;
+}
+char *spread_sketch_get_hll_base64_serialization(const struct spread_sketch *ss, const char *key, size_t key_len) {
+ uint64_t hash_x_tmp = XXH3_64bits_withSeed(key, key_len, 171);
+ uint32_t hash_x1 = (uint32_t) (hash_x_tmp >> 32);
+ uint32_t hash_x2 = (uint32_t) hash_x_tmp;
+ uint32_t *register_ret = NULL;
+ double min_cardinality = (double)INT32_MAX;
+
+ for (int i = 0; i < ss->depth; i++) {
+ uint32_t hash_x = hash_x1 + i * hash_x2;
+ int bucket_idx = (hash_x % ss->width) + i * ss->width;
+
+ double est = hll_count(ss->buckets[bucket_idx].sthll_register, ss->precision, ss->reset_idx, ss->time_window_ms);
+ if (est < min_cardinality) {
+ min_cardinality = est;
+ register_ret = ss->buckets[bucket_idx].sthll_register;
+ }
+ }
+
+ char *enc;
+ size_t enc_len;
+ hll_encode_into_base64(register_ret, ss->precision, &enc, &enc_len);
+ return enc;
+}
struct spread_sketch *spread_sketch_copy(const struct spread_sketch *src) {
struct spread_sketch *dst = malloc(sizeof(struct spread_sketch));
@@ -549,10 +596,10 @@ void spread_sketch_get_parameter_recommendation(int expected_super_spreader_numb
*depth_out = logk;
int w;
- if (max_super_spreader_number <= 100) {
- w = max_super_spreader_number * 3 /2; // * 1.5, when the number is small, we need more width
+ if (expected_super_spreader_number <= 100) {
+ w = expected_super_spreader_number * 3 /2; // * 1.5, when the number is small, we need more width
} else {
- w = max_super_spreader_number + 50; // + 50: 100*1.5-100 = 50, make w=f(k) continuous
+ w = expected_super_spreader_number + 50; // + 50: 100*1.5-100 = 50, make w=f(k) continuous
}
if (w < 40) {
w = 40;
@@ -578,9 +625,6 @@ void spread_sketch_change_precision(struct spread_sketch *ss, unsigned char prec
ss->precision = precision;
}
-void hyperloglog_serialize_for_networking(const struct hyperloglog *h, char **blob, size_t *blob_sz) {
- hyperloglog_serialize_for_networking(data->hll, blob, blob_size);
-}
// todo: serialize(swarm kv)
// todo: swarm kv 上实现reset 和copy。 \ No newline at end of file
diff --git a/src/cells/spread_sketch.h b/src/cells/spread_sketch.h
index baec128..b1c7a34 100644
--- a/src/cells/spread_sketch.h
+++ b/src/cells/spread_sketch.h
@@ -1,41 +1,38 @@
#pragma once
-#include <stddef.h>
+
#ifdef __cplusplus
extern "C"{
#endif
#include "exdata.h"
+#include <stdint.h> //uint64_t
+#include <sys/time.h> // struct timeval
+#include <stddef.h>
#define DUMMY_ITEM_HASH (1ULL<<63) // level(left most zeros) = 0
struct spread_sketch;
-
struct spread_sketch *spread_sketch_new(int depth, int width, unsigned char precision, int time_window_ms, struct timeval now);
-
void spread_sketch_free(struct spread_sketch *ss);
void spread_sketch_reset(struct spread_sketch *ss);
-int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg, struct timeval now);
-// void spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, const char* item, size_t item_len, void *arg);
-// TODO: 增加add_hash 接口
-void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn);
+int spread_sketch_add_hash(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg, struct timeval now);
+int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, const char* item, size_t item_len, void *arg, struct timeval now);
-void *spread_sketch_get0_exdata(const struct spread_sketch *ss, const char *key, size_t key_len);
+void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn);
// get the number of cells in the heavy keeper
int spread_sketch_get_count(const struct spread_sketch *ss);
-size_t spread_sketch_list(const struct spread_sketch *ss, void **exdatas, size_t n_exdatas);
-// 一系列对于spread sketch的查询
-// size_t spread_sketch_list_keys(const struct spread_sketch *ss, const char *keys[], size_t n_keys);
-// const uint32_t spread_sketch_query_register(ss, const char *key, size_t key_len);
-
-// size_t spread_sketch_list(const struct spread_sketch *ss, void **exdatas, size_t n_exdatas);
+void spread_sketch_list_keys(const struct spread_sketch *ss, char ***keys, size_t **key_lens, size_t *n_keys);
+double spread_sketch_get_cardinality(const struct spread_sketch *ss, const char *key, size_t key_len);
+char *spread_sketch_get_hll_base64_serialization(const struct spread_sketch *ss, const char *key, size_t key_len);
+void *spread_sketch_get0_exdata(const struct spread_sketch *ss, const char *key, size_t key_len);
void spread_sketch_merge(struct spread_sketch *dest, const struct spread_sketch *src);
diff --git a/src/cube.c b/src/cube.c
index 6069cdd..e8231d3 100644
--- a/src/cube.c
+++ b/src/cube.c
@@ -18,6 +18,7 @@
#define DEFAULT_N_METRIC 32
#define DEFAULT_N_CUBE 64
+static const struct timeval DUMMY_TIME_VAL = {0, 0};
struct exdata_new_args {
const struct field *cell_dimensions;
@@ -531,10 +532,13 @@ struct cube *cube_new(const struct field *dimensions, size_t n_dimensions, enum
cube->table = hash_table_new(max_n_cell);
hash_table_set_exdata_schema(cube->table, exdata_new_i, exdata_free_i, exdata_merge_i, exdata_reset_i, exdata_copy_i);
break;
- case SAMPLING_MODE_TOP_CARDINALITY:
- cube->spread_sketch = spread_sketch_new(max_n_cell);
+ case SAMPLING_MODE_TOP_CARDINALITY: {
+ int width, depth;
+ unsigned char precision;
+ spread_sketch_get_parameter_recommendation(max_n_cell, &depth, &width, &precision);
+ cube->spread_sketch = spread_sketch_new(depth, width, precision, 0, DUMMY_TIME_VAL);
spread_sketch_set_exdata_schema(cube->spread_sketch, exdata_new_i, exdata_free_i, exdata_merge_i, exdata_reset_i, exdata_copy_i);
- break;
+ break; }
default:
assert(0);
break;
@@ -674,16 +678,11 @@ struct cell *get_cell_in_spread_sketch_cube(struct cube *cube, const struct fiel
if (cube->primary_metric_id != metric_id) {
cell_data = spread_sketch_get0_exdata(cube->spread_sketch, key, key_len);
if (cell_data == NULL) {
- int tmp_ret = spread_sketch_add(cube->spread_sketch, key, key_len, DUMMY_ITEM_HASH, (void *)&args);
+ int tmp_ret = spread_sketch_add_hash(cube->spread_sketch, key, key_len, DUMMY_ITEM_HASH, (void *)&args, DUMMY_TIME_VAL);
if (tmp_ret == 1) {
cell_data = spread_sketch_get0_exdata(cube->spread_sketch, key, key_len);
}
}
- } else {
- int tmp_ret = spread_sketch_add(cube->spread_sketch, key, key_len, item_hash, (void *)&args);
- if (tmp_ret == 1) {
- cell_data = spread_sketch_get0_exdata(cube->spread_sketch, key, key_len);
- }
}
free(key);
@@ -827,10 +826,20 @@ int cube_hll_add(struct cube *cube, int metric_id, const struct field *dimension
return FS_ERR_INVALID_METRIC_ID;
}
- uint64_t hash = 0; // just any value, if we do not need to update the primary metric of spread sketch cube, hash value is not used
if (cube->sampling_mode == SAMPLING_MODE_TOP_CARDINALITY && cube->primary_metric_id == metric_id) {
- hash = XXH3_64bits(key, key_len);
+ char *dimension_as_string;
+ size_t dimension_string_len;
+ field_array_to_key(dimensions, n_dimensions, &dimension_as_string, &dimension_string_len);
+
+ struct exdata_new_args args;
+ args.cell_dimensions = dimensions;
+ args.n_dimensions = n_dimensions;
+
+ int tmp_ret = spread_sketch_add(cube->spread_sketch, dimension_as_string, dimension_string_len, key, key_len, (void *)&args, DUMMY_TIME_VAL);
+ free(dimension_as_string);
+ return tmp_ret == 1 ? FS_OK : FS_ERR_TOO_MANY_CELLS;
}
+
struct cell *cell_data = NULL;
switch (cube->sampling_mode) {
case SAMPLING_MODE_COMPREHENSIVE: {
@@ -840,7 +849,7 @@ int cube_hll_add(struct cube *cube, int metric_id, const struct field *dimension
cell_data = get_cell_in_topk_cube(cube, dimensions, n_dimensions, 0, metric_id);
break;}
case SAMPLING_MODE_TOP_CARDINALITY: {
- cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, hash, metric_id);
+ cell_data = get_cell_in_spread_sketch_cube(cube, dimensions, n_dimensions, 0, metric_id);
break;}
default:
assert(0);
@@ -881,8 +890,20 @@ int cube_hll_add_field(struct cube *cube, int metric_id, const struct field *dim
uint64_t hash = 0; // just any value, if we do not need to update the primary metric of spread sketch cube, hash value is not used
if (cube->sampling_mode == SAMPLING_MODE_TOP_CARDINALITY && cube->primary_metric_id == metric_id) {
+ char *key;
+ size_t key_len;
+ field_array_to_key(dimensions, n_dimensions, &key, &key_len);
+
+ struct exdata_new_args args;
+ args.cell_dimensions = dimensions;
+ args.n_dimensions = n_dimensions;
hash = field_array_to_hash(tags_key, n_tag_key);
+
+ int tmp_ret = spread_sketch_add_hash(cube->spread_sketch, key, key_len, hash, (void *)&args, DUMMY_TIME_VAL);
+ free(key);
+ return tmp_ret == 1 ? FS_OK : FS_ERR_TOO_MANY_CELLS;
}
+
struct cell *cell_data = NULL;
switch (cube->sampling_mode) {
case SAMPLING_MODE_COMPREHENSIVE: {
@@ -920,6 +941,24 @@ int cube_counter_incrby(struct cube *cube, int metric_id, const struct field *di
if (manifest == NULL || manifest->type != METRIC_TYPE_COUNTER) {
return FS_ERR_INVALID_METRIC_ID;
}
+
+ if (cube->primary_metric_id == metric_id && cube->sampling_mode == SAMPLING_MODE_TOPK) {
+ if (increment <= 0) {
+ return FS_ERR_INVALID_PARAM;
+ }
+
+ char *key;
+ size_t key_len;
+ field_array_to_key(dimensions, n_dimensions, &key, &key_len);
+
+ struct exdata_new_args args;
+ args.cell_dimensions = dimensions;
+ args.n_dimensions = n_dimensions;
+
+ int tmp_ret = heavy_keeper_add(cube->heavykeeper, key, key_len, increment, (void *)&args);
+ free(key);
+ return tmp_ret == 1 ? FS_OK : FS_ERR_TOO_MANY_CELLS;
+ }
struct cell *cell_data = NULL;
switch (cube->sampling_mode) {
@@ -1061,10 +1100,13 @@ struct cube *cube_fork(const struct cube *cube) {
ret->table = hash_table_new(cube->max_n_cell);
hash_table_set_exdata_schema(ret->table, exdata_new_i, exdata_free_i, exdata_merge_i, exdata_reset_i, exdata_copy_i);
break;
- case SAMPLING_MODE_TOP_CARDINALITY:
- ret->spread_sketch = spread_sketch_new(cube->max_n_cell);
+ case SAMPLING_MODE_TOP_CARDINALITY: {
+ int width, depth, dummy_time;
+ unsigned char precision;
+ spread_sketch_get_parameter(cube->spread_sketch, &depth, &width, &precision, &dummy_time);
+ ret->spread_sketch = spread_sketch_new(depth, width, precision, 0, DUMMY_TIME_VAL);
spread_sketch_set_exdata_schema(ret->spread_sketch, exdata_new_i, exdata_free_i, exdata_merge_i, exdata_reset_i, exdata_copy_i);
- break;
+ break;}
default:
assert(0);
break;
@@ -1114,6 +1156,10 @@ void cube_get_cells(const struct cube *cube, struct field_list **cell_dimensions
return;
}
+ char **spread_sketch_keys = NULL;
+ size_t *spread_sketch_keys_lens = NULL;
+ long long *heavy_keeper_counts = NULL;
+
struct cell **cell_datas = (struct cell **)malloc(sizeof(struct cell *) * n_cell_tmp);
switch (cube->sampling_mode) {
case SAMPLING_MODE_COMPREHENSIVE:
@@ -1122,8 +1168,12 @@ void cube_get_cells(const struct cube *cube, struct field_list **cell_dimensions
case SAMPLING_MODE_TOPK:
heavy_keeper_list(cube->heavykeeper, (void **)cell_datas, n_cell_tmp);
break;
- case SAMPLING_MODE_TOP_CARDINALITY:
- spread_sketch_list(cube->spread_sketch, (void **)cell_datas, n_cell_tmp);
+ case SAMPLING_MODE_TOP_CARDINALITY: {
+ spread_sketch_list_keys(cube->spread_sketch, &spread_sketch_keys, &spread_sketch_keys_lens, &n_cell_tmp);
+ for (int i = 0; i < n_cell_tmp; i++) {
+ cell_datas[i] = spread_sketch_get0_exdata(cube->spread_sketch, spread_sketch_keys[i], spread_sketch_keys_lens[i]);
+ }
+ }
break;
default:
assert(0);
@@ -1134,7 +1184,7 @@ void cube_get_cells(const struct cube *cube, struct field_list **cell_dimensions
struct tmp_sorted_data_spread_sketch_cell *tmp_sorted_data = (struct tmp_sorted_data_spread_sketch_cell *)malloc(sizeof(struct tmp_sorted_data_spread_sketch_cell) * n_cell_tmp);
for (int i = 0; i < n_cell_tmp; i++) {
tmp_sorted_data[i].data = cell_datas[i];
- tmp_sorted_data[i].hll_value = metric_hll_get(cell_datas[i]->slots[cube->primary_metric_id]);
+ tmp_sorted_data[i].hll_value = spread_sketch_get_cardinality(cube->spread_sketch, spread_sketch_keys[i], spread_sketch_keys_lens[i]);
}
qsort(tmp_sorted_data, n_cell_tmp, sizeof(struct tmp_sorted_data_spread_sketch_cell), compare_tmp_sorted_data_spread_sketch_cell);
@@ -1162,6 +1212,9 @@ void cube_get_cells(const struct cube *cube, struct field_list **cell_dimensions
}
free(cell_datas);
+ free(heavy_keeper_counts);
+ free(spread_sketch_keys);
+ free(spread_sketch_keys_lens);
}
const struct cell *get_cell_by_tag_list(const struct cube *cube, const struct field_list *fields)
@@ -1211,6 +1264,20 @@ const struct metric *get_metric_by_tag_list(const struct cube *cube, const struc
int cube_counter_get(const struct cube *cube, int metric_id, const struct field_list *fields, long long *value)
{
+ if (cube->sampling_mode == SAMPLING_MODE_TOPK && cube->primary_metric_id == metric_id) {
+ char *dimension_in_string;
+ size_t dimension_string_len;
+ field_array_to_key(fields->field, fields->n_field, &dimension_in_string, &dimension_string_len);
+
+ long long count = 0;
+ void *exdata_dummy = NULL;
+ heavy_keeper_one_point_query(cube->heavykeeper, dimension_in_string, dimension_string_len, &count, &exdata_dummy);
+ *value = count;
+
+ free(dimension_in_string);
+ return count == 0 ? FS_ERR_INVALID_TAG : FS_OK;
+ }
+
int ret;
const struct metric *metric = get_metric_by_tag_list(cube, fields, metric_id, &ret);
if (ret != FS_OK) {
@@ -1226,6 +1293,18 @@ int cube_counter_get(const struct cube *cube, int metric_id, const struct field_
int cube_hll_get(const struct cube *cube, int metric_id, const struct field_list *fields, double *value)
{
+ if (cube->sampling_mode == SAMPLING_MODE_TOP_CARDINALITY && cube->primary_metric_id == metric_id) {
+ char *dimension_in_string;
+ size_t dimension_string_len;
+ field_array_to_key(fields->field, fields->n_field, &dimension_in_string, &dimension_string_len);
+
+ double hll_value = spread_sketch_get_cardinality(cube->spread_sketch, dimension_in_string, dimension_string_len);
+ *value = hll_value;
+
+ free(dimension_in_string);
+ return FS_OK;
+ }
+
int ret;
const struct metric *metric = get_metric_by_tag_list(cube, fields, metric_id, &ret);
if (ret != FS_OK) {
@@ -1268,7 +1347,17 @@ int cube_histogram_count_le_value(const struct cube *cube, int metric_id, const
return FS_OK;
}
-int cube_get_serialization(const struct cube *cube, int metric_id, const struct field_list *fields, char **blob, size_t *blob_size) {
+int cube_get_serialization_as_base64(const struct cube *cube, int metric_id, const struct field_list *fields, char **blob, size_t *blob_size) {
+ if (cube->sampling_mode == SAMPLING_MODE_TOP_CARDINALITY && cube->primary_metric_id == metric_id) {
+ char *dimension_in_string;
+ size_t dimension_string_len;
+ field_array_to_key(fields->field, fields->n_field, &dimension_in_string, &dimension_string_len);
+
+ *blob = spread_sketch_get_hll_base64_serialization(cube->spread_sketch, dimension_in_string, dimension_string_len);
+ *blob_size = strlen(*blob);
+ return FS_OK;
+ }
+
int ret;
const struct metric *metric = get_metric_by_tag_list(cube, fields, metric_id, &ret);
if (ret != FS_OK) {
@@ -1304,8 +1393,13 @@ void cube_get_metrics_in_cell(const struct cube *cube, const struct field_list *
return;
}
- *metric_id_out = (int *)malloc(sizeof(int) * cell_data->next_index);
+ *metric_id_out = (int *)malloc(sizeof(int) * cell_data->next_index + 1); // +1: for primary metric
int n_metric = 0;
+ if (cube->sampling_mode == SAMPLING_MODE_TOP_CARDINALITY || cube->sampling_mode == SAMPLING_MODE_TOPK) { // primary metric is not stored in cell_data
+ (*metric_id_out)[n_metric] = cube->primary_metric_id;
+ n_metric++;
+ }
+
for (int i = 0; i < cell_data->next_index; i++) {
if (cell_data->slots[i] != NULL) {
(*metric_id_out)[n_metric] = i;
diff --git a/src/cube.h b/src/cube.h
index 8b6db1c..134ede8 100644
--- a/src/cube.h
+++ b/src/cube.h
@@ -35,7 +35,7 @@ int cube_counter_get(const struct cube *cube, int metric_id, const struct field_
int cube_hll_get(const struct cube *cube, int metric_id, const struct field_list *dimensions, double *value);
int cube_histogram_value_at_percentile(const struct cube *cube, int metric_id, const struct field_list *dimensions, double percentile, long long *value);
int cube_histogram_count_le_value(const struct cube *cube, int metric_id, const struct field_list *dimensions, long long value, long long *count);
-int cube_get_serialization(const struct cube *cube, int metric_id, const struct field_list *dimensions, char **blob, size_t *blob_size);
+int cube_get_serialization_as_base64(const struct cube *cube, int metric_id, const struct field_list *dimensions, char **blob, size_t *blob_size);
int cube_get_cell_count(const struct cube *cube);
enum sampling_mode cube_get_sampling_mode(const struct cube *cube);
diff --git a/src/exporter/cjson_exporter.c b/src/exporter/cjson_exporter.c
index 1606df5..6a52c6a 100644
--- a/src/exporter/cjson_exporter.c
+++ b/src/exporter/cjson_exporter.c
@@ -542,7 +542,7 @@ struct export_kv_pair *cell_query_with_iter(const struct cell_iter *iter, int me
if (type == METRIC_TYPE_HLL || type == METRIC_TYPE_HISTOGRAM) {
char *value;
size_t len;
- fieldstat_get_serialized_blob(iter->instance, cube_id, metric_id, &iter->tag_list[iter->curr_cell_idx], &value, &len);
+ fieldstat_metric_get_serialization_as_base64(iter->instance, cube_id, metric_id, &iter->tag_list[iter->curr_cell_idx], &value, &len);
if (value == NULL) {
return NULL;
}
diff --git a/src/fieldstat.c b/src/fieldstat.c
index 12c482b..4f4582b 100644
--- a/src/fieldstat.c
+++ b/src/fieldstat.c
@@ -306,7 +306,7 @@ long long fieldstat_histogram_count_le_value(const struct fieldstat *instance, i
return count;
}
-void fieldstat_get_serialized_blob(const struct fieldstat *instance, int cube_id, int metric_id, const struct field_list *cell_dimensions, char **blob, size_t *blob_size)
+void fieldstat_metric_get_serialization_as_base64(const struct fieldstat *instance, int cube_id, int metric_id, const struct field_list *cell_dimensions, char **blob, size_t *blob_size)
{
const struct cube *cube = cube_manager_get_cube_by_id(instance->cube_manager, cube_id);
if (cube == NULL) {
@@ -315,7 +315,7 @@ void fieldstat_get_serialized_blob(const struct fieldstat *instance, int cube_id
return;
}
- cube_get_serialization(cube, metric_id, cell_dimensions, blob, blob_size);
+ cube_get_serialization_as_base64(cube, metric_id, cell_dimensions, blob, blob_size);
}
void fieldstat_tag_list_arr_free(struct field_list *tag_list, size_t n_cell)
diff --git a/src/metrics/hyperloglog.c b/src/metrics/hyperloglog.c
index b13a2a2..ea05887 100644
--- a/src/metrics/hyperloglog.c
+++ b/src/metrics/hyperloglog.c
@@ -14,6 +14,8 @@
#include <stdlib.h>
#include "base64/b64.h"
+#include "hll_common.h"
+
const size_t BLOB_HDR_SIZE= offsetof(struct hyperloglog, registers);
@@ -171,36 +173,9 @@ struct hyperloglog *hyperloglog_deserialize(const char *blob, size_t blob_sz)
return h;
}
-void hyperloglog_serialize_for_networking(const struct hyperloglog *h, char **blob, size_t *blob_sz)
+void hyperloglog_serialize_into_base64(const struct hyperloglog *h, char **blob, size_t *blob_sz)
{
- size_t sz=0, offset=0;
- size_t num_reg = NUM_REG(h->cfg.precision);
- size_t words = INT_CEIL(num_reg, REG_PER_WORD);
-
- sz = hyperloglog_serialized_size(h) + 1; // [precision][version][data...
- unsigned char *buffer = ALLOC(unsigned char, sz);
-
- const unsigned char version = 1;
- memcpy(buffer+offset, &version, 1);
- offset += 1;
-
- unsigned char precision = h->cfg.precision;
- memcpy(buffer+offset, &precision, 1);
- offset += 1;
-
- for (int i = 0; i < words; i++) {
- uint32_t word = h->registers[i];
- buffer[offset++] = word >> 24;
- buffer[offset++] = word >> 16;
- buffer[offset++] = word >> 8;
- buffer[offset++] = word;
- }
-
- char *enc = b64_encode(buffer, sz);
- free(buffer);
- *blob_sz = strlen(enc);
- *blob = enc;
- return;
+ hll_encode_into_base64(h->registers, h->cfg.precision, blob, blob_sz);
}
void hyperloglog_merge_blob(struct hyperloglog *dest, const char *blob, size_t blob_sz)
diff --git a/src/metrics/hyperloglog.h b/src/metrics/hyperloglog.h
index b0e6db5..b1b2330 100644
--- a/src/metrics/hyperloglog.h
+++ b/src/metrics/hyperloglog.h
@@ -49,7 +49,7 @@ void hyperloglog_reset(struct hyperloglog *h);
double hyperloglog_count(const struct hyperloglog *h);
size_t hyperloglog_serialized_size(const struct hyperloglog *h);
void hyperloglog_serialize(const struct hyperloglog *h, char **blob, size_t *blob_sz);
-void hyperloglog_serialize_for_networking(const struct hyperloglog *h, char **blob, size_t *blob_sz);
+void hyperloglog_serialize_into_base64(const struct hyperloglog *h, char **blob, size_t *blob_sz);
struct hyperloglog *hyperloglog_deserialize(const char *blob, size_t blob_sz);
int hyperloglog_merge(struct hyperloglog *dest, const struct hyperloglog *src);
void hyperloglog_merge_blob(struct hyperloglog *dest, const char *blob, size_t blob_sz);
diff --git a/src/metrics/metric.c b/src/metrics/metric.c
index 42f9c94..4b0c290 100644
--- a/src/metrics/metric.c
+++ b/src/metrics/metric.c
@@ -297,7 +297,7 @@ void metric_serialize(const struct metric *pthis, char **blob, size_t *blob_size
struct metric_data *data = pthis->data;
enum metric_type type = pthis->type;
if (type == METRIC_TYPE_HLL) {
- hyperloglog_serialize_for_networking(data->hll, blob, blob_size);
+ hyperloglog_serialize_into_base64(data->hll, blob, blob_size);
return;
}
if (type == METRIC_TYPE_HISTOGRAM) {