summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorchenzizhan <[email protected]>2024-08-09 17:10:27 +0800
committerchenzizhan <[email protected]>2024-08-09 17:10:27 +0800
commitfb406eaf6166125575c1e957b2fbeaa881766851 (patch)
treeee70a29624ac50cd92f020e97a447f68070510e5 /src
parentab340ae375f5dde4bc7a85c86855e545de10abca (diff)
TSG-21840v4.4.1
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt2
-rw-r--r--src/swarmkv.c24
-rw-r--r--src/swarmkv_api.c36
-rw-r--r--src/swarmkv_store.c7
-rw-r--r--src/swarmkv_store.h3
-rw-r--r--src/t_spread_sketch.c307
-rw-r--r--src/t_spread_sketch.h11
7 files changed, 388 insertions, 2 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 8ee7449..84fe726 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -26,7 +26,7 @@ add_definitions(-fPIC)
set(SWARMKV_SRC swarmkv_cmd_spec.c swarmkv.c swarmkv_api.c
swarmkv_mesh.c swarmkv_rpc.c swarmkv_message.c swarmkv_net.c
swarmkv_sync.c swarmkv_store.c swarmkv_keyspace.c swarmkv_monitor.c
- t_string.c t_set.c t_token_bucket.c t_hash.c t_bloom_filter.c t_cms.c t_hyperloglog.c
+ t_string.c t_set.c t_token_bucket.c t_hash.c t_bloom_filter.c t_cms.c t_hyperloglog.c t_spread_sketch.c
swarmkv_common.c swarmkv_utils.c future_promise.c http_client.c)
set(LIB_SOURCE_FILES
diff --git a/src/swarmkv.c b/src/swarmkv.c
index d3013ef..34f3ae8 100644
--- a/src/swarmkv.c
+++ b/src/swarmkv.c
@@ -26,6 +26,7 @@
#include "t_bloom_filter.h"
#include "t_cms.h"
#include "t_hyperloglog.h"
+#include "t_spread_sketch.h"
#include "uthash.h"
#include "sds.h"
@@ -1177,6 +1178,29 @@ void command_spec_init(struct swarmkv *db)
1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
pfinfo_command, db->mod_store);
+ /*Spread sketch commands*/
+ swarmkv_command_table_register(db->mod_command_table, "SSINITBYDIM", "key width depth precision [TIME window-milliseconds]",
+ 4, 1, CMD_KEY_OW, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
+ ssinitbydim_command, db->mod_store);
+ swarmkv_command_table_register(db->mod_command_table, "SSINITBYCAPACITY", "key capacity precision [TIME window-milliseconds]*/",
+ 3, 1, CMD_KEY_OW, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
+ ssinitbycapacity_command, db->mod_store);
+ swarmkv_command_table_register(db->mod_command_table, "SSADD", "key entry item [item ...] ",
+ 3, 1, CMD_KEY_RW, REPLY_INT_MINORS1, AUTO_ROUTE,
+ ssadd_command, db->mod_store);
+ swarmkv_command_table_register(db->mod_command_table, "SSLIST", "key",
+ 1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
+ sslist_command, db->mod_store);
+ swarmkv_command_table_register(db->mod_command_table, "SSQUERY", "key entry",
+ 2, 1, CMD_KEY_RO, REPLY_INT_0, AUTO_ROUTE,
+ ssquery_command, db->mod_store);
+ swarmkv_command_table_register(db->mod_command_table, "SSMQUERY", "key entry [entry ...]",
+ 2, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
+ ssmquery_command, db->mod_store);
+ swarmkv_command_table_register(db->mod_command_table, "SSINFO", "key",
+ 1, 1, CMD_KEY_RO, REPLY_EMPTY_ARRAY, AUTO_ROUTE,
+ ssinfo_command, db->mod_store);
+
/* Debug Commands */
swarmkv_command_table_register(db->mod_command_table, "INFO", "[section]",
0, KEY_OFFSET_NONE, CMD_KEY_NA, REPLY_NA, AUTO_ROUTE,
diff --git a/src/swarmkv_api.c b/src/swarmkv_api.c
index 882721f..00147e8 100644
--- a/src/swarmkv_api.c
+++ b/src/swarmkv_api.c
@@ -579,4 +579,38 @@ void swarmkv_cmsmquery(struct swarmkv *db, const char *key, size_t keylen, const
}
swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2 + n_items, argv, argv_len);
return;
-} \ No newline at end of file
+}
+
+void swarmkv_ssadd(struct swarmkv *db, const char *key, size_t keylen, const char *item, const size_t item_len, const char *members[], size_t members_len[], size_t n_member, swarmkv_on_reply_callback_t *cb, void *cb_arg)
+{
+ const char *argv[3 + n_member];
+ size_t argv_len[3 + n_member];
+ argv[0] = "SSADD";
+ argv_len[0] = strlen(argv[0]);
+ argv[1] = key;
+ argv_len[1] = keylen;
+ argv[2] = item;
+ argv_len[2] = item_len;
+ for (size_t i = 0; i < n_member; i++)
+ {
+ argv[3 + i] = members[i];
+ argv_len[3 + i] = members_len[i];
+ }
+ swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 3 + n_member, argv, argv_len);
+}
+
+void swarmkv_ssmquery(struct swarmkv *db, const char *key, size_t keylen, const char *items[], const size_t items_len[], size_t n_items, swarmkv_on_reply_callback_t *cb, void *cb_arg)
+{
+ const char *argv[2 + n_items];
+ size_t argv_len[2 + n_items];
+ argv[0] = "SSMQUERY";
+ argv_len[0] = strlen(argv[0]);
+ argv[1] = key;
+ argv_len[1] = keylen;
+ for (size_t i = 0; i < n_items; i++)
+ {
+ argv[2 + i] = items[i];
+ argv_len[2 + i] = items_len[i];
+ }
+ swarmkv_async_command_on_argv(db, cb, cb_arg, NULL, 2 + n_items, argv, argv_len);
+}
diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c
index 8d0d563..d3319a7 100644
--- a/src/swarmkv_store.c
+++ b/src/swarmkv_store.c
@@ -112,6 +112,13 @@ struct swarmkv_obj_specs sobj_specs[__SWARMKV_OBJ_TYPE_MAX] =
.obj_merge_blob = (void (*)(void *, const char *, size_t))ST_hyperloglog_merge_blob,
.obj_replicate = (void *(*)(uuid_t, const char *, size_t))ST_hyperloglog_replicate,
.obj_size = (size_t(*)(const void *))ST_hyperloglog_mem_size},
+ {.type=OBJ_TYPE_SPREAD_SKETCH,
+ .type_name="spread-sketch",
+ .obj_free=(void (*)(void *))spread_sketch_free,
+ .obj_serialize=(void (*)(const void *, char **, size_t *))spread_sketch_serialize,
+ .obj_merge_blob=(void (*)(void *, const char *, size_t))spread_sketch_merge_blob,
+ .obj_replicate=(void * (*)(uuid_t, const char *, size_t))spread_sketch_replicate,
+ .obj_size=(size_t (*)(const void *))spread_sketch_calculate_memory_usage},
{.type = OBJ_TYPE_UNDEFINED,
.type_name = "undefined",
.obj_free = undefined_obj_free,
diff --git a/src/swarmkv_store.h b/src/swarmkv_store.h
index 0be7b61..6bde09a 100644
--- a/src/swarmkv_store.h
+++ b/src/swarmkv_store.h
@@ -16,6 +16,7 @@
#include "ap_bloom.h"
#include "cm_sketch.h"
#include "st_hyperloglog.h"
+#include "spread_sketch.h"
enum sobj_type
{
@@ -29,6 +30,7 @@ enum sobj_type
OBJ_TYPE_BLOOM_FILTER,
OBJ_TYPE_CMS,
OBJ_TYPE_HYPERLOGLOG,
+ OBJ_TYPE_SPREAD_SKETCH,
OBJ_TYPE_UNDEFINED,
__SWARMKV_OBJ_TYPE_MAX
};
@@ -49,6 +51,7 @@ struct sobj
struct AP_bloom *bloom;
struct CM_sketch *cms;
struct ST_hyperloglog *hll;
+ struct spread_sketch *spread_sketch;
void *raw;
};
};
diff --git a/src/t_spread_sketch.c b/src/t_spread_sketch.c
new file mode 100644
index 0000000..d6a7131
--- /dev/null
+++ b/src/t_spread_sketch.c
@@ -0,0 +1,307 @@
+#include <stdlib.h>
+#include <assert.h>
+#include <math.h>
+
+#include "swarmkv_common.h"
+#include "swarmkv_utils.h"
+#include "swarmkv_store.h"
+#include "swarmkv_error.h"
+#include "spread_sketch.h"
+
+enum cmd_exec_result ssinitbydim_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+ /* SSINITBYDIM key width depth precision [TIME window-milliseconds]*/
+ const sds key = cmd->argv[1];
+
+ long width = strtol(cmd->argv[2], NULL, 10);
+ if (width <= 0)
+ {
+ *reply = swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[2]);
+ return FINISHED;
+ }
+ long depth = strtol(cmd->argv[3], NULL, 10);
+ if (depth <= 0)
+ {
+ *reply = swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[3]);
+ return FINISHED;
+ }
+ long precision = strtol(cmd->argv[4], NULL, 10);
+ if (precision < HLL_MIN_PRECISION || precision > HLL_MAX_PRECISION)
+ {
+ *reply = swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[4]);
+ return FINISHED;
+ }
+ long long time_window_ms = 0;
+ if (cmd->argc == 7)
+ {
+ if (strncasecmp(cmd->argv[5], "TIME", 4) != 0)
+ {
+ *reply = swarmkv_reply_new_error(error_arg_string_should_be, cmd->argv[5], "TIME");
+ return FINISHED;
+ }
+ if (str2integer(cmd->argv[6], &time_window_ms) < 0)
+ {
+ *reply = swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[6]);
+ return FINISHED;
+ }
+ }
+
+ struct sobj *obj = store_lookup(mod_store, key);
+ if (!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+
+ if (obj->type == OBJ_TYPE_UNDEFINED)
+ {
+ assert(obj->raw == NULL);
+
+ struct timeval now;
+ gettimeofday(&now, NULL);
+ obj->spread_sketch = spread_sketch_new(width, depth, precision, time_window_ms, now);
+
+ obj->type = OBJ_TYPE_SPREAD_SKETCH;
+ *reply = swarmkv_reply_new_status("OK");
+ }
+ else
+ {
+ *reply = swarmkv_reply_new_array(0);
+ }
+ return FINISHED;
+}
+
+enum cmd_exec_result ssinitbycapacity_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+ /* SSINITBYCAPACITY key capacity precision [TIME window-milliseconds]*/
+ const sds key = cmd->argv[1];
+ long expected_superspreader_count = strtol(cmd->argv[2], NULL, 10);
+ if (expected_superspreader_count <= 0)
+ {
+ *reply = swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[2]);
+ return FINISHED;
+ }
+ long precision = strtol(cmd->argv[3], NULL, 10);
+ if (precision < HLL_MIN_PRECISION || precision > HLL_MAX_PRECISION)
+ {
+ *reply = swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[3]);
+ return FINISHED;
+ }
+ long long time_window_ms = 0; // when set to 0, it means no time decay is used
+ if (cmd->argc == 6)
+ {
+ if (strncasecmp(cmd->argv[4], "TIME", 4) != 0)
+ {
+ *reply = swarmkv_reply_new_error(error_arg_string_should_be, cmd->argv[4], "TIME");
+ return FINISHED;
+ }
+ if (str2integer(cmd->argv[5], &time_window_ms) < 0)
+ {
+ *reply = swarmkv_reply_new_error(error_arg_not_valid_integer, cmd->argv[5]);
+ return FINISHED;
+ }
+ }
+
+ struct sobj *obj = store_lookup(mod_store, key);
+ if (!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+
+ if (obj->type == OBJ_TYPE_UNDEFINED)
+ {
+ assert(obj->raw == NULL);
+
+ struct timeval now;
+ gettimeofday(&now, NULL);
+
+ int width, depth;
+ unsigned char precision_dummy;
+ spread_sketch_recommend_parameters(expected_superspreader_count, &width, &depth, &precision_dummy);
+ obj->spread_sketch = spread_sketch_new(width, depth, precision, time_window_ms, now);
+
+ obj->type = OBJ_TYPE_SPREAD_SKETCH;
+ *reply = swarmkv_reply_new_status("OK");
+ }
+ else
+ {
+ *reply = swarmkv_reply_new_array(0);
+ }
+ return FINISHED;
+}
+
+enum cmd_exec_result ssadd_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+ // SSADD key entry item [item ...]
+ const sds key = cmd->argv[1];
+ struct sobj *obj = store_lookup(mod_store, key);
+ if (!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ if (obj->type == OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
+ if (obj->type != OBJ_TYPE_SPREAD_SKETCH)
+ {
+ *reply = swarmkv_reply_new_error(error_wrong_type);
+ return FINISHED;
+ }
+
+ const sds entry_key = cmd->argv[2];
+
+ struct timeval now;
+ gettimeofday(&now, NULL);
+ for (size_t i = 3; i < cmd->argc; i++)
+ {
+ spread_sketch_add(obj->spread_sketch, entry_key, sdslen(entry_key), cmd->argv[i], sdslen(cmd->argv[i]), NULL, now);
+ }
+
+ double est = spread_sketch_query(obj->spread_sketch, entry_key, sdslen(entry_key), now);
+ *reply = swarmkv_reply_new_integer((int)(est + 0.5));
+
+ store_mark_object_as_modified(mod_store, obj);
+ return FINISHED;
+}
+
+enum cmd_exec_result sslist_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+ // SSLIST key
+ const sds key = cmd->argv[1];
+
+ struct sobj *obj = store_lookup(mod_store, key);
+ if (!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ if (obj->type == OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
+ if (obj->type != OBJ_TYPE_SPREAD_SKETCH)
+ {
+ *reply = swarmkv_reply_new_error(error_wrong_type);
+ return FINISHED;
+ }
+
+ struct spread_sketch *ss = obj->spread_sketch;
+ size_t n_entry;
+ char **entry_keys = NULL;
+ size_t *entry_keys_len = NULL;
+ spread_sketch_list_entries(ss, &entry_keys, &entry_keys_len, &n_entry);
+
+ *reply = swarmkv_reply_new_array(n_entry * 2);
+ for (size_t i = 0; i < n_entry; i++)
+ {
+ const char *entry_tmp = entry_keys[i];
+ size_t len_tmp = entry_keys_len[i];
+ int est_tmp = (int)(spread_sketch_get_cardinality(ss, entry_tmp, len_tmp) + 0.5);
+
+ (*reply)->elements[i * 2] = swarmkv_reply_new_string(entry_tmp, len_tmp);
+ (*reply)->elements[i * 2 + 1] = swarmkv_reply_new_integer(est_tmp);
+ }
+
+ free(entry_keys);
+ free(entry_keys_len);
+ return FINISHED;
+}
+
+enum cmd_exec_result ssmquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+ // SSMQUERY key entry [entry ...]
+ const sds key = cmd->argv[1];
+ struct sobj *obj = store_lookup(mod_store, key);
+ if (!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ if (obj->type == OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
+ if (obj->type != OBJ_TYPE_SPREAD_SKETCH)
+ {
+ *reply = swarmkv_reply_new_error(error_wrong_type);
+ return FINISHED;
+ }
+
+ struct spread_sketch *ss = obj->spread_sketch;
+ struct timeval now;
+ gettimeofday(&now, NULL);
+ *reply = swarmkv_reply_new_array(cmd->argc - 2);
+ for (size_t i = 2; i < cmd->argc; i++)
+ {
+ const sds entry_key = cmd->argv[i];
+ double est = spread_sketch_query(ss, entry_key, sdslen(entry_key), now);
+ (*reply)->elements[i - 2] = swarmkv_reply_new_integer((int)(est + 0.5));
+ }
+ return FINISHED;
+}
+enum cmd_exec_result ssquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+ // SSQUERY key entry
+ enum cmd_exec_result ret;
+ struct swarmkv_reply *tmp_reply = NULL;
+ ret = ssmquery_command(mod_store, cmd, &tmp_reply);
+ if (ret == FINISHED)
+ {
+ if (tmp_reply->type == SWARMKV_REPLY_ARRAY)
+ {
+ *reply = swarmkv_reply_dup(tmp_reply->elements[0]);
+ swarmkv_reply_free(tmp_reply);
+ }
+ else
+ {
+ *reply = tmp_reply;
+ }
+ }
+ return ret;
+}
+enum cmd_exec_result ssinfo_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply)
+{
+ // SSINFO key
+ const sds key = cmd->argv[1];
+
+ struct sobj *obj = store_lookup(mod_store, key);
+ if (!obj)
+ {
+ return NEED_KEY_ROUTE;
+ }
+ if (obj->type == OBJ_TYPE_UNDEFINED)
+ {
+ return handle_undefined_object(obj, reply);
+ }
+ if (obj->type != OBJ_TYPE_SPREAD_SKETCH)
+ {
+ *reply = swarmkv_reply_new_error(error_wrong_type);
+ return FINISHED;
+ }
+
+ struct spread_sketch *ss = obj->spread_sketch;
+ int depth, width, time_window_ms;
+ unsigned char precision;
+ spread_sketch_get_parameter(ss, &depth, &width, &precision, &time_window_ms);
+ double error_cms = 2.0 / (double)width;
+ double error_hll = ST_hyperloglog_error_for_precision(precision);
+ double probability = 1 / pow(2, depth);
+
+ int i = 0;
+ *reply = swarmkv_reply_new_array(14);
+ (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("Width");
+ (*reply)->elements[i++] = swarmkv_reply_new_integer(width);
+ (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("Depth");
+ (*reply)->elements[i++] = swarmkv_reply_new_integer(depth);
+ (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("Precision");
+ (*reply)->elements[i++] = swarmkv_reply_new_integer(precision);
+ (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("TimeWindowMs");
+ (*reply)->elements[i++] = swarmkv_reply_new_integer(time_window_ms);
+ (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("ErrorCMS");
+ (*reply)->elements[i++] = swarmkv_reply_new_double(error_cms);
+ (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("ErrorHLL");
+ (*reply)->elements[i++] = swarmkv_reply_new_double(error_hll);
+ (*reply)->elements[i++] = swarmkv_reply_new_string_fmt("Probability");
+ (*reply)->elements[i++] = swarmkv_reply_new_double(probability);
+ assert(i == 14);
+
+ return FINISHED;
+} \ No newline at end of file
diff --git a/src/t_spread_sketch.h b/src/t_spread_sketch.h
new file mode 100644
index 0000000..e8f6180
--- /dev/null
+++ b/src/t_spread_sketch.h
@@ -0,0 +1,11 @@
+#pragma once
+#include "swarmkv_common.h"
+#include "swarmkv_cmd_spec.h"
+
+enum cmd_exec_result ssinitbydim_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result ssinitbycapacity_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result ssadd_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result sslist_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result ssinfo_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result ssquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply);
+enum cmd_exec_result ssmquery_command(struct swarmkv_module *mod_store, const struct swarmkv_cmd *cmd, struct swarmkv_reply **reply); \ No newline at end of file