diff options
| author | chenzizhan <[email protected]> | 2024-08-09 17:10:27 +0800 |
|---|---|---|
| committer | chenzizhan <[email protected]> | 2024-08-09 17:10:27 +0800 |
| commit | fb406eaf6166125575c1e957b2fbeaa881766851 (patch) | |
| tree | ee70a29624ac50cd92f020e97a447f68070510e5 /src | |
| parent | ab340ae375f5dde4bc7a85c86855e545de10abca (diff) | |
TSG-21840v4.4.1
Diffstat (limited to 'src')
| -rw-r--r-- | src/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/swarmkv.c | 24 | ||||
| -rw-r--r-- | src/swarmkv_api.c | 36 | ||||
| -rw-r--r-- | src/swarmkv_store.c | 7 | ||||
| -rw-r--r-- | src/swarmkv_store.h | 3 | ||||
| -rw-r--r-- | src/t_spread_sketch.c | 307 | ||||
| -rw-r--r-- | src/t_spread_sketch.h | 11 |
7 files changed, 388 insertions, 2 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 8ee7449..84fe726 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -26,7 +26,7 @@ add_definitions(-fPIC) set(SWARMKV_SRC swarmkv_cmd_spec.c swarmkv.c swarmkv_api.c swarmkv_mesh.c swarmkv_rpc.c swarmkv_message.c swarmkv_net.c swarmkv_sync.c swarmkv_store.c swarmkv_keyspace.c swarmkv_monitor.c - t_string.c t_set.c t_token_bucket.c t_hash.c t_bloom_filter.c t_cms.c t_hyperloglog.c + t_string.c t_set.c t_token_bucket.c t_hash.c t_bloom_filter.c t_cms.c t_hyperloglog.c t_spread_sketch.c swarmkv_common.c swarmkv_utils.c future_promise.c http_client.c) set(LIB_SOURCE_FILES diff --git a/src/swarmkv.c b/src/swarmkv.c index d3013ef..34f3ae8 100644 --- a/src/swarmkv.c +++ b/src/swarmkv.c @@ -26,6 +26,7 @@ #include "t_bloom_filter.h" #include "t_cms.h" #include "t_hyperloglog.h" +#include "t_spread_sketch.h" #include "uthash.h" #include "sds.h" @@ -1177,6 +1178,29 @@ void command_spec_init(struct swarmkv *db) 1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE, pfinfo_command, db->mod_store); + /*Spread sketch commands*/ + swarmkv_command_table_register(db->mod_command_table, "SSINITBYDIM", "key width depth precision [TIME window-milliseconds]", + 4, 1, CMD_KEY_OW, REPLY_EMPTY_ARRAY, AUTO_ROUTE, + ssinitbydim_command, db->mod_store); + swarmkv_command_table_register(db->mod_command_table, "SSINITBYCAPACITY", "key capacity precision [TIME window-milliseconds]*/", + 3, 1, CMD_KEY_OW, REPLY_EMPTY_ARRAY, AUTO_ROUTE, + ssinitbycapacity_command, db->mod_store); + swarmkv_command_table_register(db->mod_command_table, "SSADD", "key entry item [item ...] ", + 3, 1, CMD_KEY_RW, REPLY_INT_MINORS1, AUTO_ROUTE, + ssadd_command, db->mod_store); + swarmkv_command_table_register(db->mod_command_table, "SSLIST", "key", + 1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE, + sslist_command, db->mod_store); + swarmkv_command_table_register(db->mod_command_table, "SSQUERY", "key entry", + 2, 1, CMD_KEY_RO, REPLY_INT_0, AUTO_ROUTE, + ssquery_command, db->mod_store); + swarmkv_command_table_register(db->mod_command_table, "SSMQUERY", "key entry [entry ...]", + 2, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE, + ssmquery_command, db->mod_store); + swarmkv_command_table_register(db->mod_command_table, "SSINFO", "key", + 1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE, + ssinfo_command, db->mod_store); + /* Debug Commands */ swarmkv_command_table_register(db->mod_command_table, "INFO", "[section]", 0, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE, diff --git a/src/swarmkv_api.c b/src/swarmkv_api.c index 882721f..00147e8 100644 --- a/src/swarmkv_api.c +++ b/src/swarmkv_api.c @@ -579,4 +579,38 @@ void swarmkv_cmsmquery(struct swarmkv *db, const char *key, size_t keylen, const } swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2 + n_items, argv, argv_len); return; -}
\ No newline at end of file +} + +void swarmkv_ssadd(struct swarmkv *db, const char *key, size_t keylen, const char *item, const size_t item_len, const char *members[], size_t members_len[], size_t n_member, swarmkv_on_reply_callback_t *cb, void *cb_arg) +{ + const char *argv[3 + n_member]; + size_t argv_len[3 + n_member]; + argv[0] = "SSADD"; + argv_len[0] = strlen(argv[0]); + argv[1] = key; + argv_len[1] = keylen; + argv[2] = item; + argv_len[2] = item_len; + for (size_t i = 0; i < n_member; i++) + { + argv[3 + i] = members[i]; + argv_len[3 + i] = members_len[i]; + } + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 3 + n_member, argv, argv_len); +} + +void swarmkv_ssmquery(struct swarmkv *db, const char *key, size_t keylen, const char *items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t *cb, void *cb_arg) +{ + const char *argv[2 + n_items]; + size_t argv_len[2 + n_items]; + argv[0] = "SSMQUERY"; + argv_len[0] = strlen(argv[0]); + argv[1] = key; + argv_len[1] = keylen; + for (size_t i = 0; i < n_items; i++) + { + argv[2 + i] = items[i]; + argv_len[2 + i] = items_len[i]; + } + swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2 + n_items, argv, argv_len); +} diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c index 8d0d563..d3319a7 100644 --- a/src/swarmkv_store.c +++ b/src/swarmkv_store.c @@ -112,6 +112,13 @@ struct swarmkv_obj_specs sobj_specs[__SWARMKV_OBJ_TYPE_MAX] = .obj_merge_blob = (void (*)(void *, const char *, size_t))ST_hyperloglog_merge_blob, .obj_replicate = (void *(*)(uuid_t, const char *, size_t))ST_hyperloglog_replicate, .obj_size = (size_t(*)(const void *))ST_hyperloglog_mem_size}, + {.type=OBJ_TYPE_SPREAD_SKETCH, + .type_name="spread-sketch", + .obj_free=(void (*)(void *))spread_sketch_free, + .obj_serialize=(void (*)(const void *, char **, size_t *))spread_sketch_serialize, + .obj_merge_blob=(void (*)(void *, const char *, size_t))spread_sketch_merge_blob, + .obj_replicate=(void * (*)(uuid_t, const char *, size_t))spread_sketch_replicate, + .obj_size=(size_t (*)(const void *))spread_sketch_calculate_memory_usage}, {.type = OBJ_TYPE_UNDEFINED, .type_name = "undefined", .obj_free = undefined_obj_free, diff --git a/src/swarmkv_store.h b/src/swarmkv_store.h index 0be7b61..6bde09a 100644 --- a/src/swarmkv_store.h +++ b/src/swarmkv_store.h @@ -16,6 +16,7 @@ #include "ap_bloom.h" #include "cm_sketch.h" #include "st_hyperloglog.h" +#include "spread_sketch.h" enum sobj_type { @@ -29,6 +30,7 @@ enum sobj_type OBJ_TYPE_BLOOM_FILTER, OBJ_TYPE_CMS, OBJ_TYPE_HYPERLOGLOG, + OBJ_TYPE_SPREAD_SKETCH, OBJ_TYPE_UNDEFINED, __SWARMKV_OBJ_TYPE_MAX }; @@ -49,6 +51,7 @@ struct sobj struct AP_bloom *bloom; struct CM_sketch *cms; struct ST_hyperloglog *hll; + struct spread_sketch *spread_sketch; void *raw; }; }; diff --git a/src/t_spread_sketch.c b/src/t_spread_sketch.c new file mode 100644 index 0000000..d6a7131 --- /dev/null +++ b/src/t_spread_sketch.c @@ -0,0 +1,307 @@ +#include <stdlib.h> +#include <assert.h> +#include <math.h> + +#include "swarmkv_common.h" +#include "swarmkv_utils.h" +#include "swarmkv_store.h" +#include "swarmkv_error.h" +#include "spread_sketch.h" + +enum cmd_exec_result ssinitbydim_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) +{ + /* SSINITBYDIM key width depth precision [TIME window-milliseconds]*/ + const sds key = cmd->argv[1]; + + long width = strtol(cmd->argv[2], NULL, 10); + if (width <= 0) + { + *reply = swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[2]); + return FINISHED; + } + long depth = strtol(cmd->argv[3], NULL, 10); + if (depth <= 0) + { + *reply = swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[3]); + return FINISHED; + } + long precision = strtol(cmd->argv[4], NULL, 10); + if (precision < HLL_MIN_PRECISION || precision > HLL_MAX_PRECISION) + { + *reply = swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[4]); + return FINISHED; + } + long long time_window_ms = 0; + if (cmd->argc == 7) + { + if (strncasecmp(cmd->argv[5], "TIME", 4) != 0) + { + *reply = swarmkv_reply_new_error(error_arg_string_should_be, cmd->argv[5], "TIME"); + return FINISHED; + } + if (str2integer(cmd->argv[6], &time_window_ms) < 0) + { + *reply = swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[6]); + return FINISHED; + } + } + + struct sobj *obj = store_lookup(mod_store, key); + if (!obj) + { + return NEED_KEY_ROUTE; + } + + if (obj->type == OBJ_TYPE_UNDEFINED) + { + assert(obj->raw == NULL); + + struct timeval now; + gettimeofday(&now, NULL); + obj->spread_sketch = spread_sketch_new(width, depth, precision, time_window_ms, now); + + obj->type = OBJ_TYPE_SPREAD_SKETCH; + *reply = swarmkv_reply_new_status("OK"); + } + else + { + *reply = swarmkv_reply_new_array(0); + } + return FINISHED; +} + +enum cmd_exec_result ssinitbycapacity_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) +{ + /* SSINITBYCAPACITY key capacity precision [TIME window-milliseconds]*/ + const sds key = cmd->argv[1]; + long expected_superspreader_count = strtol(cmd->argv[2], NULL, 10); + if (expected_superspreader_count <= 0) + { + *reply = swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[2]); + return FINISHED; + } + long precision = strtol(cmd->argv[3], NULL, 10); + if (precision < HLL_MIN_PRECISION || precision > HLL_MAX_PRECISION) + { + *reply = swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[3]); + return FINISHED; + } + long long time_window_ms = 0; // when set to 0, it means no time decay is used + if (cmd->argc == 6) + { + if (strncasecmp(cmd->argv[4], "TIME", 4) != 0) + { + *reply = swarmkv_reply_new_error(error_arg_string_should_be, cmd->argv[4], "TIME"); + return FINISHED; + } + if (str2integer(cmd->argv[5], &time_window_ms) < 0) + { + *reply = swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[5]); + return FINISHED; + } + } + + struct sobj *obj = store_lookup(mod_store, key); + if (!obj) + { + return NEED_KEY_ROUTE; + } + + if (obj->type == OBJ_TYPE_UNDEFINED) + { + assert(obj->raw == NULL); + + struct timeval now; + gettimeofday(&now, NULL); + + int width, depth; + unsigned char precision_dummy; + spread_sketch_recommend_parameters(expected_superspreader_count, &width, &depth, &precision_dummy); + obj->spread_sketch = spread_sketch_new(width, depth, precision, time_window_ms, now); + + obj->type = OBJ_TYPE_SPREAD_SKETCH; + *reply = swarmkv_reply_new_status("OK"); + } + else + { + *reply = swarmkv_reply_new_array(0); + } + return FINISHED; +} + +enum cmd_exec_result ssadd_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) +{ + // SSADD key entry item [item ...] + const sds key = cmd->argv[1]; + struct sobj *obj = store_lookup(mod_store, key); + if (!obj) + { + return NEED_KEY_ROUTE; + } + if (obj->type == OBJ_TYPE_UNDEFINED) + { + return handle_undefined_object(obj, reply); + } + if (obj->type != OBJ_TYPE_SPREAD_SKETCH) + { + *reply = swarmkv_reply_new_error(error_wrong_type); + return FINISHED; + } + + const sds entry_key = cmd->argv[2]; + + struct timeval now; + gettimeofday(&now, NULL); + for (size_t i = 3; i < cmd->argc; i++) + { + spread_sketch_add(obj->spread_sketch, entry_key, sdslen(entry_key), cmd->argv[i], sdslen(cmd->argv[i]), NULL, now); + } + + double est = spread_sketch_query(obj->spread_sketch, entry_key, sdslen(entry_key), now); + *reply = swarmkv_reply_new_integer((int)(est + 0.5)); + + store_mark_object_as_modified(mod_store, obj); + return FINISHED; +} + +enum cmd_exec_result sslist_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) +{ + // SSLIST key + const sds key = cmd->argv[1]; + + struct sobj *obj = store_lookup(mod_store, key); + if (!obj) + { + return NEED_KEY_ROUTE; + } + if (obj->type == OBJ_TYPE_UNDEFINED) + { + return handle_undefined_object(obj, reply); + } + if (obj->type != OBJ_TYPE_SPREAD_SKETCH) + { + *reply = swarmkv_reply_new_error(error_wrong_type); + return FINISHED; + } + + struct spread_sketch *ss = obj->spread_sketch; + size_t n_entry; + char **entry_keys = NULL; + size_t *entry_keys_len = NULL; + spread_sketch_list_entries(ss, &entry_keys, &entry_keys_len, &n_entry); + + *reply = swarmkv_reply_new_array(n_entry * 2); + for (size_t i = 0; i < n_entry; i++) + { + const char *entry_tmp = entry_keys[i]; + size_t len_tmp = entry_keys_len[i]; + int est_tmp = (int)(spread_sketch_get_cardinality(ss, entry_tmp, len_tmp) + 0.5); + + (*reply)->elements[i * 2] = swarmkv_reply_new_string(entry_tmp, len_tmp); + (*reply)->elements[i * 2 + 1] = swarmkv_reply_new_integer(est_tmp); + } + + free(entry_keys); + free(entry_keys_len); + return FINISHED; +} + +enum cmd_exec_result ssmquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) +{ + // SSMQUERY key entry [entry ...] + const sds key = cmd->argv[1]; + struct sobj *obj = store_lookup(mod_store, key); + if (!obj) + { + return NEED_KEY_ROUTE; + } + if (obj->type == OBJ_TYPE_UNDEFINED) + { + return handle_undefined_object(obj, reply); + } + if (obj->type != OBJ_TYPE_SPREAD_SKETCH) + { + *reply = swarmkv_reply_new_error(error_wrong_type); + return FINISHED; + } + + struct spread_sketch *ss = obj->spread_sketch; + struct timeval now; + gettimeofday(&now, NULL); + *reply = swarmkv_reply_new_array(cmd->argc - 2); + for (size_t i = 2; i < cmd->argc; i++) + { + const sds entry_key = cmd->argv[i]; + double est = spread_sketch_query(ss, entry_key, sdslen(entry_key), now); + (*reply)->elements[i - 2] = swarmkv_reply_new_integer((int)(est + 0.5)); + } + return FINISHED; +} +enum cmd_exec_result ssquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) +{ + // SSQUERY key entry + enum cmd_exec_result ret; + struct swarmkv_reply *tmp_reply = NULL; + ret = ssmquery_command(mod_store, cmd, &tmp_reply); + if (ret == FINISHED) + { + if (tmp_reply->type == SWARMKV_REPLY_ARRAY) + { + *reply = swarmkv_reply_dup(tmp_reply->elements[0]); + swarmkv_reply_free(tmp_reply); + } + else + { + *reply = tmp_reply; + } + } + return ret; +} +enum cmd_exec_result ssinfo_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply) +{ + // SSINFO key + const sds key = cmd->argv[1]; + + struct sobj *obj = store_lookup(mod_store, key); + if (!obj) + { + return NEED_KEY_ROUTE; + } + if (obj->type == OBJ_TYPE_UNDEFINED) + { + return handle_undefined_object(obj, reply); + } + if (obj->type != OBJ_TYPE_SPREAD_SKETCH) + { + *reply = swarmkv_reply_new_error(error_wrong_type); + return FINISHED; + } + + struct spread_sketch *ss = obj->spread_sketch; + int depth, width, time_window_ms; + unsigned char precision; + spread_sketch_get_parameter(ss, &depth, &width, &precision, &time_window_ms); + double error_cms = 2.0 / (double)width; + double error_hll = ST_hyperloglog_error_for_precision(precision); + double probability = 1 / pow(2, depth); + + int i = 0; + *reply = swarmkv_reply_new_array(14); + (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("Width"); + (*reply)->elements[i++] = swarmkv_reply_new_integer(width); + (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("Depth"); + (*reply)->elements[i++] = swarmkv_reply_new_integer(depth); + (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("Precision"); + (*reply)->elements[i++] = swarmkv_reply_new_integer(precision); + (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("TimeWindowMs"); + (*reply)->elements[i++] = swarmkv_reply_new_integer(time_window_ms); + (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("ErrorCMS"); + (*reply)->elements[i++] = swarmkv_reply_new_double(error_cms); + (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("ErrorHLL"); + (*reply)->elements[i++] = swarmkv_reply_new_double(error_hll); + (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("Probability"); + (*reply)->elements[i++] = swarmkv_reply_new_double(probability); + assert(i == 14); + + return FINISHED; +}
\ No newline at end of file diff --git a/src/t_spread_sketch.h b/src/t_spread_sketch.h new file mode 100644 index 0000000..e8f6180 --- /dev/null +++ b/src/t_spread_sketch.h @@ -0,0 +1,11 @@ +#pragma once +#include "swarmkv_common.h" +#include "swarmkv_cmd_spec.h" + +enum cmd_exec_result ssinitbydim_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); +enum cmd_exec_result ssinitbycapacity_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); +enum cmd_exec_result ssadd_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); +enum cmd_exec_result sslist_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); +enum cmd_exec_result ssinfo_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); +enum cmd_exec_result ssquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); +enum cmd_exec_result ssmquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
\ No newline at end of file |
