summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author刘畅 <[email protected]>2023-04-04 12:27:33 +0000
committer刘畅 <[email protected]>2023-04-04 12:27:33 +0000
commit00d035db8063aef61076138b116fc06dde2ea4f0 (patch)
tree48e50859012bf900ba07bdf3ad9f85d2701d327a
parent9a0ff4d68c0d165ca4c65c850dfed9c2c7dd4c80 (diff)
parentd92e71f1082c9f38ca22e762d1dd7ba8fd7c0aa9 (diff)
Merge branch 'priority_by_swarmkv' into 'rel'
Priority by swarmkv See merge request tango/shaping-engine!7
-rw-r--r--shaping/include/shaper.h4
-rw-r--r--shaping/src/shaper.cpp117
-rw-r--r--shaping/test/gtest_shaper.cpp95
-rw-r--r--shaping/test/stub.cpp72
4 files changed, 245 insertions, 43 deletions
diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h
index 56e129f..41a7ea1 100644
--- a/shaping/include/shaper.h
+++ b/shaping/include/shaper.h
@@ -70,8 +70,10 @@ struct shaping_profile_info {
int priority;
int in_deposit_token;
int out_deposit_token;
- int async_ref_count;
+ int async_token_ref_count;
+ int async_queue_len_ref_count;
unsigned long long enqueue_time_us;//to calculate max latency
+ unsigned char is_priority_blocked;
unsigned char is_invalid;
};
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index 51474e3..64775e0 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -27,6 +27,20 @@ extern "C" {
#define MICRO_SECONDS_PER_SEC 1000000
#define NANO_SECONDS_PER_SEC 1000000000
+#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 "HMGET tsg-shaping-%d priority-0"
+#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 " priority-1"
+#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 " priority-2"
+#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_4 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3 " priority-3"
+#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_5 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_4 " priority-4"
+#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_6 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_5 " priority-5"
+#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_7 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_6 " priority-6"
+#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_8 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_7 " priority-7"
+#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_9 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_8 " priority-8"
+
+const char *swarmkv_queue_len_get_cmd[] = {"", SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3,
+ SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_4, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_5, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_6,
+ SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_7, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_8, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_9};
+
struct shaper {//trees in one thread
struct avl_tree *priority_trees[SHAPING_PRIORITY_NUM_MAX];//represent 10 avl tree corresponding to 10 priority
};
@@ -39,6 +53,7 @@ struct shaping_node {//a session will have 10 nodes, corresponding 10 avl tree
struct shaping_async_cb_arg {
struct shaping_flow *sf;
struct shaping_profile_info *s_pf_info;
+ int priority;
unsigned char direction;
};
@@ -214,6 +229,11 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx)
return;
}
+static void swarmkv_reply_cb_do_nothing(const struct swarmkv_reply *reply, void * arg)
+{
+ return;
+}
+
//return success(0) while any avl tree insert success
int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time)
{
@@ -232,6 +252,7 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un
avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns);
if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {
ret = 0;
+ swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->primary.id, priority);
shaper_stat_queueing_pkt_inc(ctx->stat, s_rule_info->id, s_rule_info->primary.id,
priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index);
shaper_stat_queueing_session_inc(ctx->stat, s_rule_info->id, s_rule_info->primary.id, priority, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index);
@@ -246,6 +267,7 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un
avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns);
if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {
ret = 0;
+ swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->borrowing[i].id, priority);
shaper_stat_queueing_pkt_inc(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id,
priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index);
shaper_stat_queueing_session_inc(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id, priority, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index);
@@ -285,6 +307,8 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
priority = s_rule_info->primary.priority;
if (avl_node_in_tree(s_node->avl_node[priority])) {
avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]);
+ swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->primary.id, priority);
+
shaper_stat_queueing_pkt_dec(ctx->stat, s_rule_info->id, s_rule_info->primary.id,
priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index);
shaper_stat_queueing_session_dec(ctx->stat, s_rule_info->id, s_rule_info->primary.id, priority, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index);
@@ -302,6 +326,8 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
priority = s_rule_info->borrowing[i].priority;
if (avl_node_in_tree(s_node->avl_node[priority])) {
avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]);
+ swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->borrowing[i].id, priority);
+
shaper_stat_queueing_pkt_dec(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id,
priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index);
shaper_stat_queueing_session_dec(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id, priority, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index);
@@ -368,7 +394,7 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg
END:
free(cb_arg);
- __atomic_sub_fetch(&s_pf_info->async_ref_count, 1, __ATOMIC_SEQ_CST);
+ __atomic_sub_fetch(&s_pf_info->async_token_ref_count, 1, __ATOMIC_SEQ_CST);
shaping_flow_free(sf);//sub ref count and decide if need to free
return;
@@ -405,7 +431,7 @@ static int shaper_token_get_from_profile(struct swarmkv *db, struct shaping_flow
struct shaping_async_cb_arg *arg;
char key[32] = {0};
- __atomic_add_fetch(&pf_info->async_ref_count, 1, __ATOMIC_SEQ_CST);
+ __atomic_add_fetch(&pf_info->async_token_ref_count, 1, __ATOMIC_SEQ_CST);
__atomic_add_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST);
snprintf(key, sizeof(key), "tsg-shaping-%d-%s", pf_info->id, direction == SHAPING_DIR_OUT ? "outgoing" : "incoming");
@@ -415,7 +441,7 @@ static int shaper_token_get_from_profile(struct swarmkv *db, struct shaping_flow
arg->direction = direction;
swarmkv_tconsume(db, key, strlen(key), req_token, shaper_token_get_cb, arg);
- if (__atomic_load_n(&pf_info->async_ref_count, __ATOMIC_SEQ_CST) != 0) {//has async operation not completed
+ if (__atomic_load_n(&pf_info->async_token_ref_count, __ATOMIC_SEQ_CST) != 0) {//has async operation not completed
shaper_deposit_token_sub(pf_info, req_token, direction);
return 0;
}
@@ -436,57 +462,70 @@ static int shaper_token_get_from_profile(struct swarmkv *db, struct shaping_flow
return -1;
}
-#if 0
-int shaper_token_consume(struct shaping_flow *sf, unsigned int req_token, struct shaping_rule_info *s_rule_info)
+static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb_arg)
{
- int i;
+ struct shaping_async_cb_arg *arg = (struct shaping_async_cb_arg *)cb_arg;
+ struct shaping_profile_info *s_pf_info = arg->s_pf_info;
+ struct shaping_flow *sf = arg->sf;
+
+ s_pf_info->is_priority_blocked = 0;
- if (SHAPING_SUCCESS == shaping_token_get_from_profile(&s_rule_info->primary, 1, req_token)) {
- return SHAPING_SUCCESS;
+ if (!reply || reply->type != SWARMKV_REPLY_ARRAY) {
+ goto END;
}
- if (s_rule_info->borrowing_num > 0) {
- for (i = 0; i < s_rule_info->borrowing_num; i++) {
- if (SHAPING_SUCCESS == shaping_token_get_from_profile(&s_rule_info->borrowing[i], 0, req_token)) {
- return SHAPING_SUCCESS;
- }
+ for (unsigned int i = 0; i < reply->n_element; i++) {
+ if (reply->elements[i] && reply->elements[i]->integer > 0) {
+ s_pf_info->is_priority_blocked = 1;
+ break;
}
}
- return SHAPING_FAILED;
+END:
+ free(cb_arg);
+ __atomic_sub_fetch(&s_pf_info->async_queue_len_ref_count, 1, __ATOMIC_SEQ_CST);
+ shaping_flow_free(sf);//sub ref count and decide if need to free
}
-#endif
-static int shaper_token_consume(struct swarmkv *db, struct shaping_flow *sf, int req_token,
- struct shaping_profile_info *profile, int profile_type, unsigned char direction)
+static int shaper_profile_is_priority_blocked(struct swarmkv *db, struct shaping_flow *sf, struct shaping_profile_info *profile)
{
- return shaper_token_get_from_profile(db, sf, profile, profile_type, req_token, direction);
-
-}
+ struct shaping_async_cb_arg *arg;
+ int priority = profile->priority;
-#if 0
-enum shaping_packet_action shaper_pkt_action_decide(struct shaping_flow *sf, struct shaper *sp, void *raw_pkt,
- unsigned int pkt_len, unsigned char direction, unsigned long long income_time)
-{
- int i;
- struct shaping_rule_info *s_rule_info;
-
- for (i = sf->anchor; i < sf->rule_num; i++) {
- s_rule_info = &sf->matched_rule_infos[i];
- if (-1 == shaper_token_consume(sf, pkt_len, s_rule_info, s_rule_info->primary.priority)) {
- sf->anchor = i;
- if (0 == shaper_flow_push(sf, sp)) {
- shaper_packet_enqueue(sf, raw_pkt, direction, income_time);
- return SHAPING_HOLD;
- } else {
- return SHAPING_DROP;
- }
+ if (priority == 0) {//highest priority, can't be blocked
+ return 0;
+ }
+
+ arg = (struct shaping_async_cb_arg *)calloc(1, sizeof(struct shaping_async_cb_arg));
+ arg->s_pf_info = profile;
+ arg->sf = sf;
+ arg->priority = priority;
+
+ __atomic_add_fetch(&profile->async_queue_len_ref_count, 1, __ATOMIC_SEQ_CST);
+ __atomic_add_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST);
+
+ swarmkv_async_command(db, shaper_queue_len_get_cb, arg, swarmkv_queue_len_get_cmd[priority], profile->id);
+
+ if (__atomic_load_n(&profile->async_queue_len_ref_count, __ATOMIC_SEQ_CST) != 0) {
+ return 0;
+ } else {
+ if (profile->is_priority_blocked) {
+ return 1;
+ } else {
+ return 0;
}
}
+}
- return SHAPING_FORWARD;
+static int shaper_token_consume(struct swarmkv *db, struct shaping_flow *sf, int req_token,
+ struct shaping_profile_info *profile, int profile_type, unsigned char direction)
+{
+ if (shaper_profile_is_priority_blocked(db, sf, profile)) {
+ return -1;
+ } else {
+ return shaper_token_get_from_profile(db, sf, profile, profile_type, req_token, direction);
+ }
}
-#endif
int shaper_profile_get(struct shaping_rule_info *s_rule_info, int priority, struct shaping_profile_container pf_container[])
{
diff --git a/shaping/test/gtest_shaper.cpp b/shaping/test/gtest_shaper.cpp
index 9c70e84..3f18a99 100644
--- a/shaping/test/gtest_shaper.cpp
+++ b/shaping/test/gtest_shaper.cpp
@@ -211,6 +211,8 @@ static void shaping_global_stat_judge(char *file_line, int tx_pkts, int tx_bytes
EXPECT_EQ(queueing_pkts, shaping_global_stat_field_get(metrics, "queueing_pkts"));
EXPECT_EQ(queueing_bytes, shaping_global_stat_field_get(metrics, "queueing_bytes"));
+ cJSON_Delete(json);
+
return;
}
@@ -1245,6 +1247,93 @@ TEST(two_session_same_rule, udp_tx_in_order)
profile_a: limit 1000
*/
+TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order)
+{
+ struct stub_pkt_queue expec_tx_queue1;
+ struct stub_pkt_queue expec_tx_queue2;
+ struct stub_pkt_queue *actual_tx_queue;
+ struct shaping_ctx *ctx = NULL;
+ struct shaping_flow *sf1 = NULL;
+ struct shaping_flow *sf2 = NULL;
+ long long rule_ids[] = {1, 2};
+ long long rule_id1[] = {1};
+ long long rule_id2[] = {2};
+ int profile_nums[] = {1, 1};
+ int prioritys[] = {1, 2};
+ int profile_id[][MAX_REF_PROFILE] = {{0}, {0}};
+
+
+ TAILQ_INIT(&expec_tx_queue1);
+ TAILQ_INIT(&expec_tx_queue2);
+ stub_init();
+
+ ctx = shaping_engine_init();
+ ASSERT_TRUE(ctx != NULL);
+ sf1 = shaping_flow_new();
+ ASSERT_TRUE(sf1 != NULL);
+ sf2 = shaping_flow_new();
+ ASSERT_TRUE(sf2 != NULL);
+
+ stub_set_matched_shaping_rules(2, rule_ids, prioritys, profile_nums, profile_id);
+
+ stub_set_token_bucket_avl_per_sec(0, 1000, SHAPING_DIR_OUT);
+ actual_tx_queue = stub_get_tx_queue();
+ shaper_rules_update(&ctx->thread_ctx[0], sf1, rule_id1, 1);
+ shaper_rules_update(&ctx->thread_ctx[1], sf2, rule_id2, 1);
+
+ /*******send packets***********/
+ for (int i = 0; i < 100; i++) {
+ send_packets(&ctx->thread_ctx[0], sf1, 1, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 1, 0);
+ send_packets(&ctx->thread_ctx[1], sf2, 1, 100, SHAPING_DIR_OUT, &expec_tx_queue2, 1, 0);
+
+ if (i < 5) {
+ ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));
+ ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1));
+ }
+ }
+ ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
+
+ for (int i = 0; i < 10; i++) {//线程1中的session优先级为2,被线程0中优先级为1的session阻断
+ stub_refresh_token_bucket(0);
+ polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]);
+ ASSERT_EQ(-1, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1));//优先级低,不能发出报文
+ }
+
+ while (!TAILQ_EMPTY(&expec_tx_queue1)) {
+ stub_refresh_token_bucket(0);
+ polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
+
+ ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));//sf1 priority 1
+ }
+
+ while (!TAILQ_EMPTY(&expec_tx_queue2)) {
+ stub_refresh_token_bucket(0);
+ polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]);
+
+ ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1));//sf2 priority 2
+ }
+
+ ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
+
+ shaping_flow_free(sf1);
+ shaping_flow_free(sf2);
+ shaping_engine_destroy(ctx);
+ stub_clear_matched_shaping_rules();
+}
+
+
+
+
+/*session1 match rule1; session2 match rule2
+ rule1:
+ priority:1
+ primary profile_a: (priority 1)
+ rule2:
+ priority:2
+ primary profile_a: (priority 2)
+
+profile_a: limit 1000
+*/
TEST(two_session_diff_priority_same_profile, udp_random_tx_in_order)
{
struct stub_pkt_queue expec_tx_queue1;
@@ -1313,14 +1402,14 @@ TEST(two_session_diff_priority_same_profile, udp_random_tx_in_order)
stub_refresh_token_bucket(0);
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));//stream1 priority 1
+ ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));//sf1 priority 1
}
while (!TAILQ_EMPTY(&expec_tx_queue2)) {
stub_refresh_token_bucket(0);
polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]);
- ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1));//stream2 priority 2
+ ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1));//sf2 priority 2
}
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
@@ -1515,6 +1604,6 @@ TEST(statistics, udp_queueing_pkt)
int main(int argc, char **argv)
{
testing::InitGoogleTest(&argc, argv);
- //testing::GTEST_FLAG(filter) = "statistics.udp_queueing_pkt";
+ //testing::GTEST_FLAG(filter) = "two_session_diff_priority_same_profile.udp_random_tx_in_order";
return RUN_ALL_TESTS();
} \ No newline at end of file
diff --git a/shaping/test/stub.cpp b/shaping/test/stub.cpp
index ec7ce6b..f16d351 100644
--- a/shaping/test/stub.cpp
+++ b/shaping/test/stub.cpp
@@ -2,9 +2,11 @@
#include <MESA/swarmkv.h>
#include <MESA/maat.h>
+#include <cstdio>
#include <marsio.h>
#include <vector>
#include <stdlib.h>
+#include <stdarg.h>
#include <unistd.h>
#include <time.h>
#include <sys/time.h>
@@ -64,6 +66,7 @@ static int pf_async_times[MAX_STUB_PROFILE_NUM];
vector<struct stub_token_thread_arg> pf_async_thread[MAX_STUB_PROFILE_NUM];
struct stub_matched_rules matched_rules;
struct shaping_profile pf_array[MAX_STUB_PROFILE_NUM];
+static int profile_priority_len[MAX_STUB_PROFILE_NUM][10] = {0};
static unsigned long long curr_time = 1;
@@ -277,6 +280,75 @@ int swarmkv_options_set_log_level(struct swarmkv_options *opts, int loglevel)
return 0;
}
+static void swarmkv_hincrby_cmd_func(char *cmd_str, swarmkv_on_reply_callback_t * cb, void *cb_arg)
+{
+ int profile_id;
+ int priority;
+ int value;
+
+ sscanf(cmd_str, "HINCRBY tsg-shaping-%d priority-%d %d", &profile_id, &priority, &value);
+ profile_priority_len[profile_id][priority] += value;
+
+ cb(NULL, cb_arg);
+
+ return;
+}
+
+static void swarmkv_hmget_cmd_func(char *cmd_str, swarmkv_on_reply_callback_t * cb, void *cb_arg)
+{
+ int profile_id;
+ int priority[10];
+ int ret;
+ int priority_num;
+ struct swarmkv_reply *reply = (struct swarmkv_reply*)calloc(1, sizeof(struct swarmkv_reply));
+
+ ret = sscanf(cmd_str, "HMGET tsg-shaping-%d priority-%d priority-%d priority-%d priority-%d priority-%d priority-%d priority-%d priority-%d priority-%d",
+ &profile_id, &priority[0], &priority[1], &priority[2], &priority[3], &priority[4], &priority[5], &priority[6], &priority[7], &priority[8]);
+ priority_num = ret - 1;
+
+ reply->type = SWARMKV_REPLY_ARRAY;
+ reply->n_element = priority_num;
+ reply->elements = (struct swarmkv_reply**)calloc(priority_num, sizeof(struct swarmkv_reply*));
+ for (int i = 0; i < priority_num; i++) {
+ reply->elements[i] = (struct swarmkv_reply*)calloc(1, sizeof(struct swarmkv_reply));
+ reply->elements[i]->type = SWARMKV_REPLY_INTEGER;
+ reply->elements[i]->integer = profile_priority_len[profile_id][priority[i]];
+ }
+
+ cb(reply, cb_arg);
+
+ for(unsigned int i = 0; i < reply->n_element; i++) {
+ if (reply->elements[i]) {
+ free(reply->elements[i]);
+ }
+ }
+ free(reply->elements);
+ free(reply);
+
+}
+
+void swarmkv_async_command(struct swarmkv *db, swarmkv_on_reply_callback_t * cb, void *cb_arg, const char *format, ...)
+{
+ char *cmd_str = NULL;
+ char cmd_keyword[32] = {0};
+
+ va_list ap;
+ va_start(ap,format);
+ vasprintf(&cmd_str, format, ap);
+ va_end(ap);
+
+ sscanf(cmd_str, "%31s %*s", cmd_keyword);
+ if (strcmp(cmd_keyword, "HINCRBY") == 0) {
+ swarmkv_hincrby_cmd_func(cmd_str, cb, cb_arg);
+ } else if (strcmp(cmd_keyword, "HMGET") == 0) {
+ swarmkv_hmget_cmd_func(cmd_str, cb, cb_arg);
+ }
+
+ free(cmd_str);
+
+ return;
+}
+
void swarmkv_tconsume(struct swarmkv * db, const char * key, size_t keylen, long long tokens, swarmkv_on_reply_callback_t *cb, void *cb_arg)
{
int actual_tokens;