diff options
| author | chenzizhan <[email protected]> | 2024-07-12 18:37:40 +0800 |
|---|---|---|
| committer | chenzizhan <[email protected]> | 2024-07-12 18:37:40 +0800 |
| commit | 6b3dcefab5b4049a3f40be9faab6a05c79a8bb5b (patch) | |
| tree | 97dadc0663c837671776729aa7a75ca0001d8752 /src/cells | |
| parent | dcc5329f090d4d3e1f2b1ea6c09393c0397fc111 (diff) | |
renames
Diffstat (limited to 'src/cells')
| -rw-r--r-- | src/cells/hash_table.c | 230 | ||||
| -rw-r--r-- | src/cells/hash_table.h | 30 | ||||
| -rw-r--r-- | src/cells/heavy_keeper.c | 822 | ||||
| -rw-r--r-- | src/cells/heavy_keeper.h | 48 | ||||
| -rw-r--r-- | src/cells/spread_sketch.c | 366 | ||||
| -rw-r--r-- | src/cells/spread_sketch.h | 41 |
6 files changed, 1537 insertions, 0 deletions
diff --git a/src/cells/hash_table.c b/src/cells/hash_table.c new file mode 100644 index 0000000..8394565 --- /dev/null +++ b/src/cells/hash_table.c @@ -0,0 +1,230 @@ +#include "hash_table.h" + +#include <stdio.h> +#include <assert.h> +#include <stdlib.h> +#include <string.h> + +#include "uthash.h" + +#include "fieldstat.h" +#include "exdata.h" + +struct tag_exdata_item { + char *key; + size_t key_len; + void *exdata; + bool dying; + UT_hash_handle hh; +}; + +struct hash_table { // todo: 文件改名字 + struct tag_exdata_item *tag_id_map; + int current_cell_num; + int max_cell_num; + + exdata_new_cb new_fn; + exdata_free_cb free_fn; + exdata_merge_cb merge_fn; + exdata_reset_cb reset_fn; + exdata_copy_cb copy_fn; +}; +static void *default_new_fn(void *arg) { + return NULL; +} + +static void default_free_fn(void *exdata) { + return; +} + +static void default_merge_fn(void *dest, void *src) { + return; +} + +static void default_reset_fn(void *exdata) { + return; +} + +static void *default_copy_fn(void *exdata) { + return exdata; +} + + +struct hash_table *hash_table_new(int max_query_num) { + struct hash_table *pthis = calloc(1, sizeof(struct hash_table)); + + pthis->max_cell_num = max_query_num; + + pthis->new_fn = default_new_fn; + pthis->free_fn = default_free_fn; + pthis->merge_fn = default_merge_fn; + pthis->reset_fn = default_reset_fn; + pthis->copy_fn = default_copy_fn; + return pthis; +} + +void hash_table_free(struct hash_table *pthis) { + struct tag_exdata_item *item, *tmp; + HASH_ITER(hh, pthis->tag_id_map, item, tmp) { + HASH_DEL(pthis->tag_id_map, item); + free(item->key); + pthis->free_fn(item->exdata); + free(item); + } + free(pthis); +} + +void hash_table_reset(struct hash_table *pthis) { + struct tag_exdata_item *node, *tmp; + HASH_ITER(hh, pthis->tag_id_map, node, tmp) { + if (!node->dying) { + node->dying = true; + pthis->reset_fn(node->exdata); + continue; + } + HASH_DEL(pthis->tag_id_map, node); + free(node->key); + pthis->free_fn(node->exdata); + free(node); + } + + pthis->current_cell_num = 0; +} + +static char *my_keydup(const char *key, size_t key_len) { + char *ret = calloc(1, key_len + 1); + memcpy(ret, key, key_len); + return ret; +} + +int hash_table_add(struct hash_table *pthis, const char *key, size_t key_len, void *arg) { + struct tag_exdata_item *item; + HASH_FIND(hh, pthis->tag_id_map, key, key_len, item); + if (item != NULL && !item->dying) { + return 1; + } + + if (pthis->current_cell_num >= pthis->max_cell_num) { + return 0; + } + + if (item != NULL) { + assert(item->dying); + item->dying = false; + pthis->current_cell_num++; + return 1; + } + + item = calloc(1, sizeof(struct tag_exdata_item)); + item->key = my_keydup(key, key_len); + item->key_len = key_len; + item->exdata = pthis->new_fn(arg); + item->dying = false; + HASH_ADD_KEYPTR(hh, pthis->tag_id_map, item->key, key_len, item); + + pthis->current_cell_num++; + return 1; +} + +void hash_table_set_exdata_schema(struct hash_table *pthis, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn) { + pthis->new_fn = new_fn; + pthis->free_fn = free_fn; + pthis->merge_fn = merge_fn; + pthis->reset_fn = reset_fn; + pthis->copy_fn = copy_fn; +} + +void *hash_table_get0_exdata(struct hash_table *pthis, const char *key, size_t key_len) { + struct tag_exdata_item *item; + HASH_FIND(hh, pthis->tag_id_map, key, key_len, item); + if (item == NULL || item->dying) { + return NULL; + } + return item->exdata; +} + +int hash_table_get_count(const struct hash_table *pthis) { + return pthis->current_cell_num; +} + +size_t hash_table_list(const struct hash_table *pthis, void **exdatas, size_t n_exdatas) { + size_t actual_len = pthis->current_cell_num; + if (actual_len == 0) { + return 0; + } + + struct tag_exdata_item *item, *tmp; + size_t i = 0; + HASH_ITER(hh, pthis->tag_id_map, item, tmp) { + if (item->dying) { + continue; + } + if (i >= n_exdatas) { + break; + } + exdatas[i] = item->exdata; + i++; + } + return actual_len < n_exdatas ? actual_len : n_exdatas; +} + +int hash_table_merge(struct hash_table *dest, struct hash_table *src) { + struct tag_exdata_item *item_src, *tmp; + struct tag_exdata_item *item_dst; + HASH_ITER(hh, src->tag_id_map, item_src, tmp) { + if (item_src->dying) { + continue; + } + + HASH_FIND(hh, dest->tag_id_map, item_src->key, item_src->key_len, item_dst); + if (item_dst != NULL && !item_dst->dying) { + dest->merge_fn(item_dst->exdata, item_src->exdata); + continue; + } + + if (dest->current_cell_num >= dest->max_cell_num) { + continue; // cannot add more cells + } + + if (item_dst == NULL) { + item_dst = calloc(1, sizeof(struct tag_exdata_item)); + item_dst->key = my_keydup(item_src->key, item_src->key_len); + item_dst->key_len = item_src->key_len; + item_dst->dying = false; + item_dst->exdata = dest->copy_fn(item_src->exdata); + HASH_ADD_KEYPTR(hh, dest->tag_id_map, item_dst->key, item_dst->key_len, item_dst); + dest->current_cell_num++; + } else { + assert(item_dst->dying); + item_dst->dying = false; + dest->merge_fn(item_dst->exdata, item_src->exdata); //item_dst->exdata should be empty, but just merge to it. + dest->current_cell_num++; + } + } + return 0; +} + +struct hash_table *hash_table_copy(const struct hash_table *src) { + struct hash_table *pthis = calloc(1, sizeof(struct hash_table)); + pthis->max_cell_num = src->max_cell_num; + pthis->current_cell_num = src->current_cell_num; + pthis->new_fn = src->new_fn; + pthis->free_fn = src->free_fn; + pthis->merge_fn = src->merge_fn; + pthis->reset_fn = src->reset_fn; + pthis->copy_fn = src->copy_fn; + + struct tag_exdata_item *item, *tmp; + HASH_ITER(hh, src->tag_id_map, item, tmp) { + if (item->dying) { + continue; + } + struct tag_exdata_item *new_item = calloc(1, sizeof(struct tag_exdata_item)); + new_item->key = my_keydup(item->key, item->key_len); + new_item->key_len = item->key_len; + new_item->exdata = pthis->copy_fn(item->exdata); + new_item->dying = false; + HASH_ADD_KEYPTR(hh, pthis->tag_id_map, new_item->key, new_item->key_len, new_item); + } + return pthis; +} diff --git a/src/cells/hash_table.h b/src/cells/hash_table.h new file mode 100644 index 0000000..59fc8e8 --- /dev/null +++ b/src/cells/hash_table.h @@ -0,0 +1,30 @@ +#pragma once + +#include <stddef.h> + +#ifdef __cplusplus +extern "C"{ +#endif + +#include "exdata.h" + +struct hash_table; + +struct hash_table *hash_table_new(int max_query_num); +void hash_table_set_exdata_schema(struct hash_table *pthis, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn); +void hash_table_free(struct hash_table *pthis); +void hash_table_reset(struct hash_table *pthis); +int hash_table_merge(struct hash_table *dest, struct hash_table *src); +struct hash_table *hash_table_copy(const struct hash_table *src); + +// int hash_table_add(struct hash_table *pthis, const char *key, size_t key_len, int count, void *arg); +int hash_table_add(struct hash_table *pthis, const char *key, size_t key_len, void *arg); + +// void *hash_table_get0_exdata(struct hash_table *pthis, const char *key, size_t key_len); +void *hash_table_get0_exdata(struct hash_table *pthis, const char *key, size_t key_len); +int hash_table_get_count(const struct hash_table *pthis); +size_t hash_table_list(const struct hash_table *pthis, void **exdatas, size_t n_exdatas); + +#ifdef __cplusplus +} +#endif
\ No newline at end of file diff --git a/src/cells/heavy_keeper.c b/src/cells/heavy_keeper.c new file mode 100644 index 0000000..b606a67 --- /dev/null +++ b/src/cells/heavy_keeper.c @@ -0,0 +1,822 @@ +#include "heavy_keeper.h" + +#include <float.h> +#include <limits.h> +#include <math.h> +#include <stdbool.h> +#include <stddef.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> + +#include "minheap/heap.h" +#include "mpack/mpack.h" +#include "xxhash/xxhash.h" +// XXHASH is faster +#define HASH_FUNCTION(keyptr, keylen, hashv) \ + do { \ + hashv = XXH3_64bits(keyptr, keylen); \ + } while (0) + +#include "uthash.h" + +#include "exdata.h" + +#define FP_HASH_KEY 0 +#define NOT_FIND (-1) +#define MAX(a, b) (((a) > (b)) ? (a) : (b)) + +struct entry_data { // the value constitute of a sorted set entry + char *key; + size_t key_len; + + void *exdata; +}; + +struct sorted_set { + heap *heap; + struct hash_node *hash_tbl; + + int n_living_entry; + + exdata_new_cb new_fn; + exdata_free_cb free_fn; + exdata_merge_cb merge_fn; + exdata_reset_cb reset_fn; + exdata_copy_cb copy_fn; +}; + +struct hash_node { + const char *key; // value is equal to entry_data::key(point to the same memory) + size_t key_len; + heap_entry *val; + UT_hash_handle hh; +}; + +struct Bucket { + uint64_t finger_print; + long long count; // the actual count, sum of all uuid_hash_node->count +}; + +// Parameters used in algorithm +struct heavy_keeper_options{ + int r; // the size of the array. Default value: 4 + // Set it by how accurate you want. Value of 4 guarantees an accuracy more than 90% and around 97% in all tests. + // Not too big because it have an impact on both time and memory efficiency. + int w; // M2, the maximum number of buckets every array keeps. Default value: k*log(k) and no less than 100. + // Basically, as long as big enough, it won't affect the accuracy significantly. + double b; // b, bigger variance of flow size is(elephant flows take more ratio), smaller it should be. + // Must bigger than 1. Better not bigger than 1.3, otherwise some elephant flow will be missed. +}; + +struct heavy_keeper { + int K; + struct sorted_set *top_K_heap; + struct heavy_keeper_options params; + struct Bucket *sketch; + + unsigned int rand_state; + + exdata_new_cb new_fn; + exdata_free_cb free_fn; + exdata_merge_cb merge_fn; + exdata_reset_cb reset_fn; + exdata_copy_cb copy_fn; +}; + +static void *default_new_fn(void *arg) { + return NULL; +} + +static void default_free_fn(void *exdata) { + return; +} + +static void default_merge_fn(void *dest, void *src) { + return; +} + +static void default_reset_fn(void *exdata) { + return; +} + +static void *default_copy_fn(void *exdata) { + return exdata; +} + +struct entry_data *entry_data_construct(const char *key, size_t key_len, void *exdata) +{ + struct entry_data *entry_data = (struct entry_data *)malloc(sizeof(struct entry_data)); + + entry_data->key = (char *)malloc(key_len); + memcpy(entry_data->key, key, key_len); + entry_data->key_len = key_len; + entry_data->exdata = exdata; + return entry_data; +} + +void entry_data_destroy(struct entry_data *entry_data, exdata_free_cb free_fn) +{ + if (entry_data == NULL) { + return; + } + free(entry_data->key); + free_fn(entry_data->exdata); + free(entry_data); +} + +static inline long long sorted_set_entry_get_score(const heap_entry *entry) +{ + long long score = *(long long *)entry->key; + if (score == 0) { + return 0; + } + return score - 1; // sorted set will let the count start from 1, 0 for dying entry. +} + +struct entry_data *sorted_set_entry_get_data(const heap_entry *entry) +{ + return entry->value; +} + +/* + dying: To reduce the time of HASH_ADD, and to support merely recording a key without any value, give a dying status. +*/ +static inline bool sorted_set_entry_dying(const heap_entry *entry) +{ + long long score = *(long long *)entry->key; + return score == 0; +} + +static inline long long safe_add(long long a, long long b) +{ + if (INT64_MAX - a < b) { + return INT64_MAX; + } + return a + b; +} + +// cppcheck-suppress [constParameterCallback, unmatchedSuppression] +static bool cmp_entry_cb(void *aa, void *bb) +{ + heap_entry *a = (heap_entry *)aa; + heap_entry *b = (heap_entry *)bb; + long long score_a = *(long long *)a->key; + long long score_b = *(long long *)b->key; + + if (score_a < score_b) { + return true; + } + return false; +} + +static int entry_get_index_cb(void *a) +{ + return ((heap_entry *)a)->index; +} + +void entry_set_index_cb(int index, void *a) +{ + ((heap_entry *)a)->index = index; +} + +struct sorted_set *sorted_set_new(int set_size) +{ + struct sorted_set *ret = (struct sorted_set *)malloc(sizeof(struct sorted_set)); + ret->hash_tbl = NULL; + ret->n_living_entry = 0; + heap *h = (heap *)malloc(sizeof(heap)); + init_heap(h, set_size, cmp_entry_cb, entry_get_index_cb, entry_set_index_cb); + + ret->heap = h; + + ret->new_fn = default_new_fn; + ret->free_fn = default_free_fn; + ret->merge_fn = default_merge_fn; + ret->reset_fn = default_reset_fn; + ret->copy_fn = default_copy_fn; + + return ret; +} + +void sorted_set_schema_set(struct sorted_set *ss, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn) +{ + ss->new_fn = new_fn; + ss->free_fn = free_fn; + ss->merge_fn = merge_fn; + ss->reset_fn = reset_fn; + ss->copy_fn = copy_fn; +} + +void sorted_set_free(struct sorted_set *ss) +{ + struct hash_node *tbl = ss->hash_tbl; + struct hash_node *h_node, *tmp; + HASH_ITER(hh, tbl, h_node, tmp) { + HASH_DEL(tbl, h_node); + free(h_node); + } + + heap *heap = ss->heap; + heap_entry *node; + for (int i = 0; i < heap->cur_size; i++) { + node = (heap_entry *)heap->nodes[i]; + entry_data_destroy(node->value, ss->free_fn); + free(node->key); + free(node); + } + free(heap->nodes); + free(heap); + + free(ss); +} + +heap_entry *sorted_set_find_entry(const struct sorted_set *ss, const char *key, size_t key_len) +{ + struct hash_node *hash_tbl = ss->hash_tbl; + struct hash_node *s = NULL; + HASH_FIND(hh, hash_tbl, key, key_len, s); + + if (s == NULL) { + return NULL; + } + return (heap_entry *)s->val; +} + +static inline void sorted_set_entry_set_score(struct sorted_set *ss, heap_entry *entry, long long score) +{ + *(long long *)(entry->key) = score; + + adjust_heap_node(ss->heap, entry); +} + +int sorted_set_pop(struct sorted_set *ss) +{ + struct hash_node *hash_tbl = ss->hash_tbl; + heap_entry *entry = (heap_entry *)pop_heap(ss->heap); + if (entry == NULL) { + return -1; + } + struct entry_data *tmp_data = sorted_set_entry_get_data(entry); + + struct hash_node *s = NULL; + HASH_FIND(hh, hash_tbl, tmp_data->key, tmp_data->key_len, s); + assert(s!=NULL); + + HASH_DEL(hash_tbl, s); + free(s); + ss->hash_tbl = hash_tbl; + + int ret; + if (sorted_set_entry_dying(entry)) { + ret = 0; + } else { + ss->n_living_entry--; + ret = 1; + } + + entry_data_destroy(tmp_data, ss->free_fn); + free(entry->key); + free(entry); + + return ret; +} + +static inline bool sorted_set_check_is_full(const struct sorted_set *ss) +{ + return ss->heap->cur_size >= ss->heap->max_size; +} + +long long sorted_set_get_min_score(const struct sorted_set *ss) +{ + heap *heap = ss->heap; + if (heap->cur_size == 0) { + return NOT_FIND; + } + const heap_entry *ret = (heap_entry *)(heap->nodes[0]); + + long long score = *(long long *)ret->key; + if (score == 0) { + return NOT_FIND; + } + return score - 1; // sorted set will let the score start from 1, 0 for dying entry. +} + +void sorted_set_insert_to_available_heap(struct sorted_set *ss, const char *key, size_t key_len, long long cnt, void *exdata) +{ + cnt = safe_add(cnt, 1); // sorted set will let the score start from 1, 0 for dying entry. + long long *tmp_cnt = (long long*)malloc(sizeof(long long)); + *tmp_cnt = cnt; + struct entry_data *tmp_data = entry_data_construct(key, key_len, exdata); + heap_entry *node = (heap_entry *)malloc(sizeof(heap_entry)); + node->key = tmp_cnt; + node->value = tmp_data; + + push_heap(ss->heap, (void *)node); + + struct hash_node *hash_tbl = ss->hash_tbl; + struct hash_node *s = (struct hash_node *)malloc(sizeof(struct hash_node)); + s->key = sorted_set_entry_get_data(node)->key; + s->key_len = key_len; + s->val = node; + HASH_ADD_KEYPTR(hh, hash_tbl, s->key, key_len, s); + ss->hash_tbl = hash_tbl; + + ss->n_living_entry++; +} + +int sorted_set_insert(struct sorted_set *ss, const char *key, size_t key_len, long long cnt, void *args) +{ + // if there is a dying record, reborn it to use. + heap_entry *entry = sorted_set_find_entry(ss, key, key_len); + if (entry != NULL) { + if (!sorted_set_entry_dying(entry)) { + assert(0); + return -1; + } + sorted_set_entry_set_score(ss, entry, safe_add(cnt, 1)); // sorted set will let the score start from 1, 0 for dying entry. + + ss->n_living_entry++; + return 1; + } + + if (sorted_set_check_is_full(ss)) { + long long tmp_mincnt = sorted_set_get_min_score(ss); + if (tmp_mincnt != NOT_FIND && cnt <= tmp_mincnt) { // even if all cells in sorted set are dying, the sorted set can still be a full one, in which case, the min score is invalid. + return 0; + } + sorted_set_pop(ss); + } + sorted_set_insert_to_available_heap(ss, key, key_len, cnt, ss->new_fn(args)); + return 1; +} + +static inline int sorted_set_cardinality(const struct sorted_set *ss) +{ + return ss->n_living_entry; +} + +long long sorted_set_get_score(const struct sorted_set *ss, const char *key, size_t key_len) +{ + if (sorted_set_cardinality(ss) == 0) { + return NOT_FIND; + } + const heap_entry *entry = sorted_set_find_entry(ss, key, key_len); + if (entry == NULL) { + return NOT_FIND; + } + if (sorted_set_entry_dying(entry)) { + return NOT_FIND; + } + + return sorted_set_entry_get_score(entry); +} + +int sorted_set_incrby(struct sorted_set *ss, const char *key, size_t key_len, long long score) +{ + heap_entry *entry = sorted_set_find_entry(ss, key, key_len); + if (entry == NULL) { + return -1; + } + long long cnt_old; + if (sorted_set_entry_dying(entry) == false) { + cnt_old = sorted_set_entry_get_score(entry); + cnt_old += 1; // sorted set will let the score start from 1, 0 for dying entry. + } else { + cnt_old = 0; + } + sorted_set_entry_set_score(ss, entry, safe_add(score, cnt_old)); + return 0; +} + +typedef struct { + long long key; + struct entry_data *val; +} tmp_heap_node; + +int cmp_tmp_heap_node(const void *a, const void *b) +{ + const tmp_heap_node *aa = (tmp_heap_node *)a; + const tmp_heap_node *bb = (tmp_heap_node *)b; + + if (aa->key < bb->key) { + return 1; + } else if (aa->key > bb->key) { + return -1; + } else { + return 0; + } +} + +void sorted_set_dump(const struct sorted_set *ss, char **key_out, size_t *key_len_out, void **exdata_out) +{ + struct minheap *h = ss->heap; + tmp_heap_node *tmp_nodes = (tmp_heap_node *)malloc(sizeof(tmp_heap_node) * h->cur_size); + size_t n_living_entry = 0; + for (int i = 0; i < h->cur_size; i++) { + const heap_entry *entry = (heap_entry *)h->nodes[i]; + if (sorted_set_entry_dying(entry)) { + continue; + } + tmp_nodes[n_living_entry].key = sorted_set_entry_get_score(entry); + tmp_nodes[n_living_entry].val = sorted_set_entry_get_data(entry); + n_living_entry++; + } + + assert(n_living_entry == ss->n_living_entry); + qsort(tmp_nodes, n_living_entry, sizeof(tmp_heap_node), cmp_tmp_heap_node); + for (int i = 0; i < n_living_entry; i++) { + key_out[i] = tmp_nodes[i].val->key; + key_len_out[i] = tmp_nodes[i].val->key_len; + exdata_out[i] = tmp_nodes[i].val->exdata; + } + free(tmp_nodes); +} + +struct sorted_set *sorted_set_copy(const struct sorted_set *ss) +{ + struct sorted_set *ret = sorted_set_new(ss->heap->max_size); + sorted_set_schema_set(ret, ss->new_fn, ss->free_fn, ss->merge_fn, ss->reset_fn, ss->copy_fn); + + for (int i = 0; i < ss->heap->cur_size; i++) { + const heap_entry *entry = (heap_entry *)ss->heap->nodes[i]; + if (sorted_set_entry_dying(entry)) { + continue; + } + const struct entry_data *data = sorted_set_entry_get_data(entry); + sorted_set_insert_to_available_heap(ret, data->key, data->key_len, sorted_set_entry_get_score(entry), ss->copy_fn(data->exdata)); + } + + return ret; +} + +// set all the entry to dying status. They will be the first to be popped. +void sorted_set_reset(struct sorted_set *ss) +{ + heap *heap = ss->heap; + heap_entry *entry; + for (int i = 0; i < heap->cur_size; i++) { + entry = (heap_entry *)heap->nodes[i]; + *(long long *)entry->key = 0; + + ss->reset_fn(sorted_set_entry_get_data(entry)->exdata); + } + + ss->n_living_entry = 0; +} + +/* -------------------------------------------------------------------------- */ +/* heavy keeper */ +/* -------------------------------------------------------------------------- */ + +struct Bucket *new_sketch(struct heavy_keeper_options *params) { + size_t array_len = (size_t)params->r * (size_t)params->w; + + struct Bucket *ret = (struct Bucket *)calloc(array_len, sizeof(struct Bucket)); + + return ret; +} + +void params_set_to_default(struct heavy_keeper_options *p, int K) { + if (K > 5000) { // don't let the sketch too large when K gets insanely large + K = 5000; + } + double log_ret = log((double)K) / 2.3; // 2.3: log_e(10), log_ret = log_10(K) + + if (log_ret < 3) { + p->r = 3; + } else { + p->r = (int)(log_ret); + } + p->b = 1.17; // by test, 1.17 is the best + p->w = (int)(log_ret * K * 2); + if (p->w < 150) { + p->w = 150; // determined through test, too small max_bucket_num will let accuracy decrease severely. + } else if (p->w > 600) { + p->w = p->w / 4 + 450; + } +} + +struct heavy_keeper *heavy_keeper_new(int max_query_num) { + struct heavy_keeper *hk = (struct heavy_keeper *)malloc(sizeof(struct heavy_keeper)); + hk->K = max_query_num; + hk->rand_state = 0; + + hk->top_K_heap = sorted_set_new(max_query_num); + + params_set_to_default(&(hk->params), max_query_num); + + hk->sketch = new_sketch(&(hk->params)); + + hk->new_fn = default_new_fn; + hk->free_fn = default_free_fn; + hk->merge_fn = default_merge_fn; + hk->reset_fn = default_reset_fn; + + return hk; +} + +void heavy_keeper_free(struct heavy_keeper *hk) { + sorted_set_free(hk->top_K_heap); + free(hk->sketch); + + free(hk); +} + +void heavy_keeper_reset(struct heavy_keeper *hk) { + sorted_set_reset(hk->top_K_heap); + memset(hk->sketch, 0, sizeof(struct Bucket) * (size_t)hk->params.r * (size_t)hk->params.w); +} + +const int DECAY_POW_TABLE[128] = { // 1.17 ^ exp * RAND_MAX, exp is in [0,127] +2147483647,1835456109,1568765905,1340825560,1146004752,979491241,837172001,715531625,611565491,522705548,446756879,381843486,326361954,278941841,238411830,203770795,174163072,148857327,127228484,108742294,92942132, +79437720,67895487,58030331,49598573,42391943,36232430,30967889,26468281,22622462,19335438,16526015,14124799,12072478,10318357,8819109,7537700,6442479,5506392,4706318,4022494,3438029,2938486,2511527,2146604, +1834704,1568123,1340276,1145535,979090,836829,715239,611315,522491,446574,381687,326228,278828,238314,203687,174092,148796,127176,108698,92904,79405,67868,58007,49578,42375,36218,30955,26457,22613,19328,16519, +14119,12068,10314,8815,7535,6440,5504,4704,4021,3437,2937,2510,2146,1834,1567,1340,1145,979,836,715,611,522,446,382,326,279,238,204,174,149,127,109,93,79,68,58,50,42,36,31,26,23,19,17,14,12,10,9,8,6,6,5, +}; +bool if_need_to_decay(struct heavy_keeper *hk, const struct Bucket *bucket, long long count) { + if (count == 0) { + return false; + } + if (bucket->count < count) { // the exp is 0, so the decay rate is 1 + return true; + } + + long long exp = bucket->count / count; + if (exp > 127) { // decay_rate too small, almost no chance to decay + return false; + } + + // double decay_rate = pow(hk->params.b, -exp); + // p->b = 1.17 is fixed, search table to get result directly. + int decay_rate = DECAY_POW_TABLE[exp]; + + if (rand_r(&(hk->rand_state)) < decay_rate) { + return true; + } + return false; +} + +static inline uint64_t cal_hash_val_with_seed(const char *key, size_t key_len, unsigned int seed) { + return XXH3_64bits_withSeed(key, key_len, seed); +} + +/* +1 for newly add something. 0 for not add. -1 for unexpected cases. +*/ +int heavy_keeper_add(struct heavy_keeper *heavy_keeper, const char *key, size_t key_len, long long count, void *arg) { + assert(count >= 0); + if (count == 0) { + if (sorted_set_cardinality(heavy_keeper->top_K_heap) < heavy_keeper->K) { + const struct heap_entry *entry = sorted_set_find_entry(heavy_keeper->top_K_heap, key, key_len); + + if (entry != NULL && !sorted_set_entry_dying(entry)) { + return 0; + } + + sorted_set_insert(heavy_keeper->top_K_heap, key, key_len, count, arg); + return 1; + } + return 0; + } + + struct sorted_set *summary = heavy_keeper->top_K_heap; + + long long old_cnt = sorted_set_get_score(summary, key, key_len); + bool not_in_sorted_set = (old_cnt == NOT_FIND); + long long maxv = 0; + uint64_t fp = cal_hash_val_with_seed(key, key_len, FP_HASH_KEY); + uint64_t h1 = fp; + uint64_t h2 = cal_hash_val_with_seed(key, key_len, FP_HASH_KEY+1); + + for (uint64_t j = 0; j < heavy_keeper->params.r; j++) { + uint64_t hashv = h1 + j * h2; // use `double hashing` strategy + struct Bucket *bucket = &(heavy_keeper->sketch[j * heavy_keeper->params.w + (hashv % heavy_keeper->params.w)]); + + if (bucket->finger_print == fp) { + // If a key is not in the min-heap, then the estimated key size should be no larger than nmin. + // or if the min-heap is not full(min_value == NOT_FIND), every key should be taken into account, so of course it should be added. + // in neither case, bucket->count > nMin && not_in_sorted_set happen. + // The keys whose counts are both larger than nmin and not in min-heap must have the same xxhash value, and its FP stored in bucket represents another field, + // In this case, the sketch won't be updated. This key is expected to be taken into account in another array, + // where its FP is different from the one it should collided with, so that element keys won't be missed. + if (not_in_sorted_set) { + long long min_value = sorted_set_get_min_score(summary); + if (min_value != NOT_FIND && bucket->count > min_value) { + continue; + } + } + bucket->count = safe_add(bucket->count, count); + maxv = MAX(maxv, bucket->count); + } else { + if (!if_need_to_decay(heavy_keeper, bucket, count)) { + continue; + } + + if (bucket->count < count) { + bucket->finger_print = fp; + bucket->count = count; + + maxv = MAX(maxv, count); + } else { + bucket->count -= count; + } + } + } + + if (not_in_sorted_set) { + if (sorted_set_cardinality(summary) != heavy_keeper->K) { + sorted_set_insert(summary, key, key_len, maxv, arg); + return 1; + } + long long min_value = sorted_set_get_min_score(summary); + if (maxv > min_value || min_value == NOT_FIND) { + sorted_set_insert(summary, key, key_len, maxv, arg); + return 1; + } + return 0; + } else { + if (maxv > old_cnt) { + sorted_set_incrby(summary, key, key_len, maxv - old_cnt); + } + return 1; // no popped, but the exdata definitely exists in the sorted set + } +} + +int heavy_keeper_set_exdata_schema(struct heavy_keeper *hk, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn) { + sorted_set_schema_set(hk->top_K_heap, new_fn, free_fn, merge_fn, reset_fn, copy_fn); + hk->new_fn = new_fn; + hk->free_fn = free_fn; + hk->merge_fn = merge_fn; + hk->reset_fn = reset_fn; + hk->copy_fn = copy_fn; + + return 0; +} + +void *heavy_keeper_get0_exdata(const struct heavy_keeper *hk, const char *key, size_t key_len) { + const heap_entry *entry = sorted_set_find_entry(hk->top_K_heap, key, key_len); + if (entry == NULL || sorted_set_entry_dying(entry)) { + return NULL; + } + return sorted_set_entry_get_data(entry)->exdata; +} + +int heavy_keeper_get_count(const struct heavy_keeper *hk) { + return sorted_set_cardinality(hk->top_K_heap); +} + +size_t heavy_keeper_list(const struct heavy_keeper *hk, void **exdatas, size_t n_exdatas) { + size_t actual_len = sorted_set_cardinality(hk->top_K_heap); + if (actual_len == 0) { + return 0; + } + + char **keys_dummy = (char **)malloc(sizeof(char *) * actual_len); + size_t *key_lens_dummy = (size_t *)malloc(sizeof(size_t) * actual_len); + if (n_exdatas < actual_len) { + void **exdatas_ret = (void **)malloc(sizeof(void *) * actual_len); + sorted_set_dump(hk->top_K_heap, keys_dummy, key_lens_dummy, exdatas_ret); + memcpy(exdatas, exdatas_ret, sizeof(void *) * n_exdatas); + free(exdatas_ret); + } else { + sorted_set_dump(hk->top_K_heap, keys_dummy, key_lens_dummy, exdatas); + } + + free(keys_dummy); + free(key_lens_dummy); + + return actual_len < n_exdatas ? actual_len : n_exdatas; +} + +static void heavy_keeper_merge_sketch(struct heavy_keeper *dest, const struct heavy_keeper *src) { + int w = dest->params.w; + int d = dest->params.r; + //idx + for (int array_id = 0; array_id < d; array_id++) { + for (int bucket_id = 0; bucket_id < w; bucket_id++) { + struct Bucket *bucket_dest = &(dest->sketch[array_id * w + bucket_id]); + const struct Bucket *bucket_src = &(src->sketch[array_id * w + bucket_id]); + + if (bucket_dest->finger_print == bucket_src->finger_print) { + bucket_dest->count = safe_add(bucket_dest->count, bucket_src->count); + } else { + if (bucket_dest->count < bucket_src->count) { + bucket_dest->count = bucket_src->count; + bucket_dest->finger_print = bucket_src->finger_print; + } + } + } + } +} + +long long find_count_in_sketch(struct heavy_keeper *hk, const char *key, size_t key_len) { + struct Bucket *bucket; + uint64_t fp = cal_hash_val_with_seed(key, key_len, FP_HASH_KEY); + uint64_t h1 = fp; + uint64_t h2 = cal_hash_val_with_seed(key, key_len, FP_HASH_KEY+1); + + long long maxv = 0; + for (uint64_t array_id = 0; array_id < hk->params.r; array_id++) { + uint64_t hash = h1 + array_id * h2; + bucket = &(hk->sketch[array_id * hk->params.w + (hash % hk->params.w)]); + + if (bucket->finger_print == fp) { + maxv = MAX(maxv, bucket->count); + } + } + + return maxv; +} + +void heavy_keeper_merge(struct heavy_keeper *dest, const struct heavy_keeper *src) { + assert(dest->K == src->K); + + heavy_keeper_merge_sketch(dest, src); + + struct sorted_set *new_rec = sorted_set_new(dest->K); // merging result + sorted_set_schema_set(new_rec, dest->new_fn, dest->free_fn, dest->merge_fn, dest->reset_fn, dest->copy_fn); + const struct sorted_set *ss_dest = dest->top_K_heap; // original ss in dest + const struct sorted_set *ss_src = src->top_K_heap; // the ss to be merged + int size_dest = sorted_set_cardinality(ss_dest); + int size_src = sorted_set_cardinality(ss_src); + int max_size = size_dest > size_src ? size_dest : size_src; + + void **exdatas_dst = (void **)calloc(size_dest, sizeof(void *)); + char **key_arr = (char **)malloc(sizeof(char *) * max_size); + size_t *key_lens = (size_t *)malloc(sizeof(size_t) * max_size); + sorted_set_dump(ss_dest, key_arr, key_lens, exdatas_dst); + + + /* ------------------------------ merge dest ------------------------------ */ + for (int i = 0; i < size_dest; i++) { + long long maxv = find_count_in_sketch(dest, key_arr[i], key_lens[i]); + sorted_set_insert_to_available_heap(new_rec, key_arr[i], key_lens[i], maxv, dest->copy_fn(exdatas_dst[i])); + } + + /* ------------------------------ merge src ------------------------------ */ + void **exdatas_src = (void **)calloc(size_src, sizeof(void *)); + sorted_set_dump(ss_src, key_arr, key_lens, exdatas_src); + + for (int i = 0; i < size_src; i++) { + const heap_entry *entry = sorted_set_find_entry(new_rec, key_arr[i], key_lens[i]); + if (entry != NULL) { // the key is in both dest and src, so has been processed in the previous loop. The reason why no need to sum up result is that merged sketch already keeps its summed up count + void *exdata_new = sorted_set_entry_get_data(entry)->exdata; + void *exdata_src = exdatas_src[i]; + dest->merge_fn(exdata_new, exdata_src); + + continue; + } + + long long cnt = find_count_in_sketch(dest, key_arr[i], key_lens[i]);// the cnt is the estimated count in the merged sketch, since the dest heavy keeper has been merged + if (sorted_set_check_is_full(new_rec)) { + long long mincnt_new = sorted_set_get_min_score(new_rec); + if (cnt > mincnt_new) { + sorted_set_pop(new_rec); + sorted_set_insert_to_available_heap(new_rec, key_arr[i], key_lens[i], cnt, dest->copy_fn(exdatas_src[i])); + } + } else { + sorted_set_insert_to_available_heap(new_rec, key_arr[i], key_lens[i], cnt, dest->copy_fn(exdatas_src[i])); + } + } + + free(key_arr); + free(key_lens); + free(exdatas_dst); + free(exdatas_src); + sorted_set_free(dest->top_K_heap); + dest->top_K_heap = new_rec; +} + +struct heavy_keeper *heavy_keeper_copy(const struct heavy_keeper *src) { + struct heavy_keeper *ret = (struct heavy_keeper *)malloc(sizeof(struct heavy_keeper)); + ret->K = src->K; + ret->rand_state = 0; + + ret->top_K_heap = sorted_set_copy(src->top_K_heap); + + ret->params = src->params; + ret->sketch = (struct Bucket *)malloc(sizeof(struct Bucket) * (size_t)ret->params.r * (size_t)ret->params.w); + memcpy(ret->sketch, src->sketch, sizeof(struct Bucket) * (size_t)ret->params.r * (size_t)ret->params.w); + + ret->new_fn = src->new_fn; + ret->free_fn = src->free_fn; + ret->merge_fn = src->merge_fn; + ret->reset_fn = src->reset_fn; + ret->copy_fn = src->copy_fn; + + return ret; +} + +void heavy_keeper_one_point_query(const struct heavy_keeper *hk, const char *key, size_t key_len, long long *count_out, void **exdata_out) { + *count_out = 0; + *exdata_out = NULL; + + const heap_entry *entry = sorted_set_find_entry(hk->top_K_heap, key, key_len); + if (entry == NULL) { + return; + } + *count_out = sorted_set_entry_get_score(entry); + *exdata_out = sorted_set_entry_get_data(entry)->exdata; +}
\ No newline at end of file diff --git a/src/cells/heavy_keeper.h b/src/cells/heavy_keeper.h new file mode 100644 index 0000000..3b09598 --- /dev/null +++ b/src/cells/heavy_keeper.h @@ -0,0 +1,48 @@ +#pragma once + +#include <stddef.h> + +#ifdef __cplusplus +extern "C"{ +#endif + +#include "exdata.h" + + +struct heavy_keeper; + +struct heavy_keeper *heavy_keeper_new(int max_query_num); +/** + * @brief free a heavy keeper. + * @param hk the pointer to the heavy keeper. + */ +void heavy_keeper_free(struct heavy_keeper *hk); + +/* + * @brief clear a heavy keeper. It will clear all the data in the heavy keeper. as for every exdata, use the reset function `exdata_reset_cb` to reset it. + * @param hk the pointer to the heavy keeper. +*/ +void heavy_keeper_reset(struct heavy_keeper *hk); + +int heavy_keeper_add(struct heavy_keeper *hk, const char *key, size_t key_len, long long count, void *arg); + +int heavy_keeper_set_exdata_schema(struct heavy_keeper *hk, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn); + +void *heavy_keeper_get0_exdata(const struct heavy_keeper *hk, const char *key, size_t key_len); + +// get the number of cells in the heavy keeper +int heavy_keeper_get_count(const struct heavy_keeper *hk); + +size_t heavy_keeper_list(const struct heavy_keeper *hk, void **exdatas, size_t n_exdatas); + +void heavy_keeper_merge(struct heavy_keeper *dest, const struct heavy_keeper *src); + +struct heavy_keeper *heavy_keeper_copy(const struct heavy_keeper *src); + +// for test +void heavy_keeper_one_point_query(const struct heavy_keeper *hk, const char *key, size_t key_len, long long *count_out, void **exdata_out); + + +#ifdef __cplusplus +} +#endif
\ No newline at end of file diff --git a/src/cells/spread_sketch.c b/src/cells/spread_sketch.c new file mode 100644 index 0000000..e79815b --- /dev/null +++ b/src/cells/spread_sketch.c @@ -0,0 +1,366 @@ +#include <stdbool.h> +#include <stdlib.h> +#include <string.h> +#include <stdio.h> +#include <math.h> +#include <assert.h> + +#include "xxhash/xxhash.h" +#include "uthash.h" + +#include "spread_sketch.h" +#include "exdata.h" + +// todo:把primary metric 记到sketch 里,且使用特殊的st Hyperloglog + +struct entry { + int ref_count; + void *exdata; + bool dying; + char *key; + size_t key_len; + UT_hash_handle hh; +}; + +struct spread_sketch_scheme { + exdata_new_cb new_fn; + exdata_free_cb free_fn; + exdata_merge_cb merge_fn; + exdata_reset_cb reset_fn; + exdata_copy_cb copy_fn; +}; + +struct entry_table { + struct entry *entry; + + struct spread_sketch_scheme scheme; +}; + +struct bucket { + struct entry *content; + uint32_t level; +}; + +struct spread_sketch { + int depth; + int width; + + struct spread_sketch_scheme scheme; + + struct bucket *buckets; + struct entry_table *table; + + uint32_t *min_level_per_row; // TODO: 先看看性能吧, 之后再写。用来记录每行最小的level,从而跳过行数。对于64位的level,维持一个计数,额外使用64 r的空间,当一个最小位数的level 计数到0时,更新最小level。 + // TODO: 对比heavy keeper,不仅仅是跳过的问题,heavykeeper 无论什么情况,在输入0的时候都不会走sketch 更新。 + // 或者简单记录用掉的bucket 数量也挺好。 +}; + +static void *default_new_fn(void *arg) { + return NULL; +} +static void default_free_fn(void *exdata) { + return; +} +static void default_merge_fn(void *dest, void *src) { + return; +} +static void default_reset_fn(void *exdata) { + return; +} +static void *default_copy_fn(void *exdata) { + return exdata; +} +static inline bool key_equal(const char *key1, size_t key1_len, const char *key2, size_t key2_len) { + if (key1_len != key2_len) { + return false; + } + return memcmp(key1, key2, key1_len) == 0; +} +static inline char *key_dup(const char *key, size_t key_len) { + char *ret = malloc(key_len+1); + memcpy(ret, key, key_len); + ret[key_len] = '\0'; + return ret; +} + +struct entry *smart_ptr_table_get(struct entry_table *table, const char *key, size_t key_len, void *arg) { + struct entry *ret; + HASH_FIND(hh, table->entry, key, key_len, ret); + + if (ret != NULL) { + ret->dying = false; + ret->ref_count++; + } else { + ret = malloc(sizeof(struct entry)); + ret->dying = false; + ret->ref_count = 1; + ret->key = key_dup(key, key_len); + ret->key_len = key_len; + if (arg == NULL) { + ret->exdata = NULL; + } else { + ret->exdata = table->scheme.new_fn(arg); + } + HASH_ADD_KEYPTR(hh, table->entry, ret->key, ret->key_len, ret); + } + + return ret; +} + +int smart_ptr_table_release(struct entry_table *table, const char *key, size_t key_len) { + struct entry *ret; + HASH_FIND(hh, table->entry, key, key_len, ret); + if (ret == NULL) { + return -1; + } + + ret->ref_count--; + if (ret->ref_count == 0) { + // printf("release %s\n", key); + HASH_DEL(table->entry, ret); + table->scheme.free_fn(ret->exdata); + free(ret->key); + free(ret); + } + + return 0; +} + +void smart_ptr_table_free(struct entry_table *table) { + struct entry *current, *tmp; + HASH_ITER(hh, table->entry, current, tmp) { + HASH_DEL(table->entry, current); + table->scheme.free_fn(current->exdata); + free(current->key); + free(current); + } + free(table); +} + +struct entry_table *smart_ptr_table_new() { + struct entry_table *table = malloc(sizeof(struct entry_table)); + table->entry = NULL; + + table->scheme.new_fn = default_new_fn; + table->scheme.free_fn = default_free_fn; + table->scheme.merge_fn = default_merge_fn; + table->scheme.reset_fn = default_reset_fn; + table->scheme.copy_fn = default_copy_fn; + + return table; +} + +void get_parameter_recommendation(int max_super_spreader_number, int *depth_out, int *width_out) +{ + int logk = max_super_spreader_number >= 3200 ? 4 : 3; // lg3200 = 3.51,round up to 4 + *depth_out = logk; + + int w; + if (max_super_spreader_number <= 100) { + w = max_super_spreader_number * 3 /2; // * 1.5, when the number is small, we need more width + } else { + w = max_super_spreader_number + 50; // + 50: 100*1.5-100 = 50, make w=f(k) continuous + } + if (w < 40) { + w = 40; + } + *width_out = w; +} + +struct spread_sketch *spread_sketch_new(int expected_query_num) { + struct spread_sketch *pthis = malloc(sizeof(struct spread_sketch)); + get_parameter_recommendation(expected_query_num, &pthis->depth, &pthis->width); + + pthis->buckets = calloc(pthis->depth * pthis->width, sizeof(struct bucket)); + pthis->scheme.new_fn = default_new_fn; + pthis->scheme.free_fn = default_free_fn; + pthis->scheme.merge_fn = default_merge_fn; + pthis->scheme.reset_fn = default_reset_fn; + pthis->scheme.copy_fn = default_copy_fn; + pthis->table = smart_ptr_table_new(); + pthis->table->scheme = pthis->scheme; + + return pthis; +} + +// return 0 if not added, return 1 if added +int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg) { + // uint64_t hash_identifier = XXH3_64bits_withSeed(identifier, identifier_length, 171); + uint32_t level = (uint32_t)__builtin_clzll(item_hash) + 1; + // printf("spread_sketch_add key %s, level %u\n", key, level); + + // https://www.eecs.harvard.edu/~michaelm/postscripts/tr-02-05.pdf + // A technique from the hashing literature is to use two hash functions h1(x) and h2(x) to simulate additional hash functions of the form gi(x) = h1(x) + ih2(x) + // Assuming that the 128-bit xxhash function is perfect, we can view it as a combination of two 64-bit hash functions. + uint64_t hash_x_tmp = XXH3_64bits_withSeed(key, key_length, 171); + uint32_t hash_x1 = (uint32_t) (hash_x_tmp >> 32); + uint32_t hash_x2 = (uint32_t) hash_x_tmp; + + bool in_sketch = false; + for (int i = 0; i < ss->depth; i++) { + uint32_t hash_x = hash_x1 + i * hash_x2; + int bucket_idx = (hash_x % ss->width) + i * ss->width; + struct bucket *bucket = &ss->buckets[bucket_idx]; + + if (bucket->content != NULL && key_equal(bucket->content->key, bucket->content->key_len, key, key_length)) { + bucket->content->dying = false; + + if (bucket->level < level) { + bucket->level = level; + } + in_sketch = true; + } else { + uint32_t old_level = bucket->content == NULL ? 0: bucket->level; + + if (level > old_level) { + // printf("update key %s to %s, and level %u to %u, in bucket (r,w)=(%d,%u)\n", bucket->content == NULL ? "NULL": bucket->content->key, key, old_level, level, i, hash_x % ss->width); + const struct entry *content_old = bucket->content; + if (content_old != NULL) { + smart_ptr_table_release(ss->table, content_old->key, content_old->key_len); + } + struct entry *content_new = smart_ptr_table_get(ss->table, key, key_length, arg); + bucket->content = content_new; + bucket->level = level; + + in_sketch = true; + } + } + } + + return in_sketch ? 1 : 0; +} + +void spread_sketch_free(struct spread_sketch *ss) { + smart_ptr_table_free(ss->table); + free(ss->buckets); + free(ss); +} + +void spread_sketch_merge(struct spread_sketch *dst, const struct spread_sketch *src) +{ + assert(dst->depth == src->depth && dst->width == src->width); + + for (int i = 0; i < dst->depth * dst->width; i++) { + const struct bucket *bucket_src = &src->buckets[i]; + struct bucket *bucket_dst = &dst->buckets[i]; + + if (bucket_src->content == NULL || bucket_src->content->dying) { + continue; + } + if (bucket_dst->content == NULL) { + bucket_dst->content = smart_ptr_table_get(dst->table, bucket_src->content->key, bucket_src->content->key_len, NULL); + bucket_dst->level = bucket_src->level; + continue; + } + bucket_dst->content->dying = false; + + if (key_equal(bucket_src->content->key, bucket_src->content->key_len, bucket_dst->content->key, bucket_dst->content->key_len)) { + if (bucket_src->level > bucket_dst->level) { + bucket_dst->level = bucket_src->level; + } + } else { + if (bucket_src->level > bucket_dst->level) { + smart_ptr_table_release(dst->table, bucket_dst->content->key, bucket_dst->content->key_len); + bucket_dst->content = smart_ptr_table_get(dst->table, bucket_src->content->key, bucket_src->content->key_len, NULL); + bucket_dst->level = bucket_src->level; + } + } + } + + const struct spread_sketch_scheme *scheme = &dst->table->scheme; + + struct entry *content_dest, *content_src, *tmp; + HASH_ITER(hh, dst->table->entry, content_dest, tmp) { + HASH_FIND(hh, src->table->entry, content_dest->key, content_dest->key_len, content_src); + if (content_src == NULL || content_src->dying) { + continue; + } + + if (content_dest->exdata == NULL) { + content_dest->exdata = scheme->copy_fn(content_src->exdata); + } else { + scheme->merge_fn(content_dest->exdata, content_src->exdata); + } + } +} + +void *spread_sketch_get0_exdata(const struct spread_sketch *ss, const char *key, size_t key_len) { + struct entry *content; + HASH_FIND(hh, ss->table->entry, key, key_len, content); + if (content == NULL || content->dying) { + return NULL; + } + return content->exdata; +} + +void spread_sketch_reset(struct spread_sketch *ss) { + for (int i = 0; i < ss->depth * ss->width; i++) { + struct bucket *bucket = &ss->buckets[i]; + bucket->level = 0; + } + + struct entry *content, *tmp; + HASH_ITER(hh, ss->table->entry, content, tmp) { + ss->scheme.reset_fn(content->exdata); + content->dying = true; + } +} + +void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn) { + ss->scheme.new_fn = new_fn; + ss->scheme.free_fn = free_fn; + ss->scheme.merge_fn = merge_fn; + ss->scheme.reset_fn = reset_fn; + ss->scheme.copy_fn = copy_fn; + + ss->table->scheme = ss->scheme; +} + +int spread_sketch_get_count(const struct spread_sketch *ss) { + int cnt = 0; + struct entry *content, *tmp; + HASH_ITER(hh, ss->table->entry, content, tmp) { + if (!content->dying) { + cnt++; + } + } + + return cnt; +} + +size_t spread_sketch_list(const struct spread_sketch *ss, void **exdatas, size_t n_exdatas) { + size_t count = 0; + struct entry *content, *tmp; + HASH_ITER(hh, ss->table->entry, content, tmp) { + if (content->dying) { + continue; + } + if (count >= n_exdatas) { + break; + } + exdatas[count] = content->exdata; + count++; + } + return count; +} + +struct spread_sketch *spread_sketch_copy(const struct spread_sketch *src) { + struct spread_sketch *dst = malloc(sizeof(struct spread_sketch)); + memcpy(dst, src, sizeof(struct spread_sketch)); + + dst->buckets = calloc(dst->depth * dst->width, sizeof(struct bucket)); + dst->table = smart_ptr_table_new(); + spread_sketch_set_exdata_schema(dst, src->scheme.new_fn, src->scheme.free_fn, src->scheme.merge_fn, src->scheme.reset_fn, src->scheme.copy_fn); + + for (int i = 0; i < dst->depth * dst->width; i++) { + if (src->buckets[i].content == NULL || src->buckets[i].content->dying) { + continue; + } + dst->buckets[i].content = smart_ptr_table_get(dst->table, src->buckets[i].content->key, src->buckets[i].content->key_len, NULL); + dst->buckets[i].level = src->buckets[i].level; + if (dst->buckets[i].content->exdata == NULL) { + dst->buckets[i].content->exdata = src->scheme.copy_fn(src->buckets[i].content->exdata); + } + } + return dst; +} diff --git a/src/cells/spread_sketch.h b/src/cells/spread_sketch.h new file mode 100644 index 0000000..9717238 --- /dev/null +++ b/src/cells/spread_sketch.h @@ -0,0 +1,41 @@ +#pragma once + +#include <stddef.h> + +#ifdef __cplusplus +extern "C"{ +#endif + +#include "exdata.h" + +#define DUMMY_ITEM_HASH (1ULL<<63) // level(left most zeros) = 0 + +struct spread_sketch; + +// spread sketch alway store values more than expected_query_num,expected_query_num is a hint to set spread sketch parameters properly +struct spread_sketch *spread_sketch_new(int expected_query_num); + +void spread_sketch_free(struct spread_sketch *ss); + +void spread_sketch_reset(struct spread_sketch *ss); + +int spread_sketch_add(struct spread_sketch *ss, const char *key, size_t key_length, uint64_t item_hash, void *arg); +void spread_sketch_set_exdata_schema(struct spread_sketch *ss, exdata_new_cb new_fn, exdata_free_cb free_fn, exdata_merge_cb merge_fn, exdata_reset_cb reset_fn, exdata_copy_cb copy_fn); + +void *spread_sketch_get0_exdata(const struct spread_sketch *ss, const char *key, size_t key_len); + +// get the number of cells in the heavy keeper +int spread_sketch_get_count(const struct spread_sketch *ss); + +size_t spread_sketch_list(const struct spread_sketch *ss, void **exdatas, size_t n_exdatas); + +void spread_sketch_merge(struct spread_sketch *dest, const struct spread_sketch *src); + +struct spread_sketch *spread_sketch_copy(const struct spread_sketch *src); + +// for test +// void spread_sketch_one_point_query(const struct spread_sketch *ss, const char *key, size_t key_len, int *level_out, void **exdata_out); + +#ifdef __cplusplus +} +#endif
\ No newline at end of file |
