diff options
| author | 刘畅 <[email protected]> | 2023-04-04 12:27:33 +0000 |
|---|---|---|
| committer | 刘畅 <[email protected]> | 2023-04-04 12:27:33 +0000 |
| commit | 00d035db8063aef61076138b116fc06dde2ea4f0 (patch) | |
| tree | 48e50859012bf900ba07bdf3ad9f85d2701d327a | |
| parent | 9a0ff4d68c0d165ca4c65c850dfed9c2c7dd4c80 (diff) | |
| parent | d92e71f1082c9f38ca22e762d1dd7ba8fd7c0aa9 (diff) | |
Merge branch 'priority_by_swarmkv' into 'rel'
Priority by swarmkv
See merge request tango/shaping-engine!7
| -rw-r--r-- | shaping/include/shaper.h | 4 | ||||
| -rw-r--r-- | shaping/src/shaper.cpp | 117 | ||||
| -rw-r--r-- | shaping/test/gtest_shaper.cpp | 95 | ||||
| -rw-r--r-- | shaping/test/stub.cpp | 72 |
4 files changed, 245 insertions, 43 deletions
diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h index 56e129f..41a7ea1 100644 --- a/shaping/include/shaper.h +++ b/shaping/include/shaper.h @@ -70,8 +70,10 @@ struct shaping_profile_info { int priority; int in_deposit_token; int out_deposit_token; - int async_ref_count; + int async_token_ref_count; + int async_queue_len_ref_count; unsigned long long enqueue_time_us;//to calculate max latency + unsigned char is_priority_blocked; unsigned char is_invalid; }; diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index 51474e3..64775e0 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -27,6 +27,20 @@ extern "C" { #define MICRO_SECONDS_PER_SEC 1000000 #define NANO_SECONDS_PER_SEC 1000000000 +#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 "HMGET tsg-shaping-%d priority-0" +#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 " priority-1" +#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 " priority-2" +#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_4 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3 " priority-3" +#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_5 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_4 " priority-4" +#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_6 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_5 " priority-5" +#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_7 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_6 " priority-6" +#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_8 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_7 " priority-7" +#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_9 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_8 " priority-8" + +const char *swarmkv_queue_len_get_cmd[] = {"", SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3, + SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_4, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_5, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_6, + SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_7, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_8, SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_9}; + struct shaper {//trees in one thread struct avl_tree *priority_trees[SHAPING_PRIORITY_NUM_MAX];//represent 10 avl tree corresponding to 10 priority }; @@ -39,6 +53,7 @@ struct shaping_node {//a session will have 10 nodes, corresponding 10 avl tree struct shaping_async_cb_arg { struct shaping_flow *sf; struct shaping_profile_info *s_pf_info; + int priority; unsigned char direction; }; @@ -214,6 +229,11 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx) return; } +static void swarmkv_reply_cb_do_nothing(const struct swarmkv_reply *reply, void * arg) +{ + return; +} + //return success(0) while any avl tree insert success int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time) { @@ -232,6 +252,7 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns); if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) { ret = 0; + swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->primary.id, priority); shaper_stat_queueing_pkt_inc(ctx->stat, s_rule_info->id, s_rule_info->primary.id, priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index); shaper_stat_queueing_session_inc(ctx->stat, s_rule_info->id, s_rule_info->primary.id, priority, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index); @@ -246,6 +267,7 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns); if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) { ret = 0; + swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->borrowing[i].id, priority); shaper_stat_queueing_pkt_inc(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id, priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index); shaper_stat_queueing_session_inc(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id, priority, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index); @@ -285,6 +307,8 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) priority = s_rule_info->primary.priority; if (avl_node_in_tree(s_node->avl_node[priority])) { avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); + swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->primary.id, priority); + shaper_stat_queueing_pkt_dec(ctx->stat, s_rule_info->id, s_rule_info->primary.id, priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index); shaper_stat_queueing_session_dec(ctx->stat, s_rule_info->id, s_rule_info->primary.id, priority, SHAPING_PROFILE_TYPE_PRIMARY, ctx->thread_index); @@ -302,6 +326,8 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) priority = s_rule_info->borrowing[i].priority; if (avl_node_in_tree(s_node->avl_node[priority])) { avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); + swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, NULL, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->borrowing[i].id, priority); + shaper_stat_queueing_pkt_dec(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id, priority, pkt_wrapper->direction, pkt_wrapper->length, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index); shaper_stat_queueing_session_dec(ctx->stat, s_rule_info->id, s_rule_info->borrowing[i].id, priority, SHAPING_PROFILE_TYPE_BORROW, ctx->thread_index); @@ -368,7 +394,7 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg END: free(cb_arg); - __atomic_sub_fetch(&s_pf_info->async_ref_count, 1, __ATOMIC_SEQ_CST); + __atomic_sub_fetch(&s_pf_info->async_token_ref_count, 1, __ATOMIC_SEQ_CST); shaping_flow_free(sf);//sub ref count and decide if need to free return; @@ -405,7 +431,7 @@ static int shaper_token_get_from_profile(struct swarmkv *db, struct shaping_flow struct shaping_async_cb_arg *arg; char key[32] = {0}; - __atomic_add_fetch(&pf_info->async_ref_count, 1, __ATOMIC_SEQ_CST); + __atomic_add_fetch(&pf_info->async_token_ref_count, 1, __ATOMIC_SEQ_CST); __atomic_add_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST); snprintf(key, sizeof(key), "tsg-shaping-%d-%s", pf_info->id, direction == SHAPING_DIR_OUT ? "outgoing" : "incoming"); @@ -415,7 +441,7 @@ static int shaper_token_get_from_profile(struct swarmkv *db, struct shaping_flow arg->direction = direction; swarmkv_tconsume(db, key, strlen(key), req_token, shaper_token_get_cb, arg); - if (__atomic_load_n(&pf_info->async_ref_count, __ATOMIC_SEQ_CST) != 0) {//has async operation not completed + if (__atomic_load_n(&pf_info->async_token_ref_count, __ATOMIC_SEQ_CST) != 0) {//has async operation not completed shaper_deposit_token_sub(pf_info, req_token, direction); return 0; } @@ -436,57 +462,70 @@ static int shaper_token_get_from_profile(struct swarmkv *db, struct shaping_flow return -1; } -#if 0 -int shaper_token_consume(struct shaping_flow *sf, unsigned int req_token, struct shaping_rule_info *s_rule_info) +static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb_arg) { - int i; + struct shaping_async_cb_arg *arg = (struct shaping_async_cb_arg *)cb_arg; + struct shaping_profile_info *s_pf_info = arg->s_pf_info; + struct shaping_flow *sf = arg->sf; + + s_pf_info->is_priority_blocked = 0; - if (SHAPING_SUCCESS == shaping_token_get_from_profile(&s_rule_info->primary, 1, req_token)) { - return SHAPING_SUCCESS; + if (!reply || reply->type != SWARMKV_REPLY_ARRAY) { + goto END; } - if (s_rule_info->borrowing_num > 0) { - for (i = 0; i < s_rule_info->borrowing_num; i++) { - if (SHAPING_SUCCESS == shaping_token_get_from_profile(&s_rule_info->borrowing[i], 0, req_token)) { - return SHAPING_SUCCESS; - } + for (unsigned int i = 0; i < reply->n_element; i++) { + if (reply->elements[i] && reply->elements[i]->integer > 0) { + s_pf_info->is_priority_blocked = 1; + break; } } - return SHAPING_FAILED; +END: + free(cb_arg); + __atomic_sub_fetch(&s_pf_info->async_queue_len_ref_count, 1, __ATOMIC_SEQ_CST); + shaping_flow_free(sf);//sub ref count and decide if need to free } -#endif -static int shaper_token_consume(struct swarmkv *db, struct shaping_flow *sf, int req_token, - struct shaping_profile_info *profile, int profile_type, unsigned char direction) +static int shaper_profile_is_priority_blocked(struct swarmkv *db, struct shaping_flow *sf, struct shaping_profile_info *profile) { - return shaper_token_get_from_profile(db, sf, profile, profile_type, req_token, direction); - -} + struct shaping_async_cb_arg *arg; + int priority = profile->priority; -#if 0 -enum shaping_packet_action shaper_pkt_action_decide(struct shaping_flow *sf, struct shaper *sp, void *raw_pkt, - unsigned int pkt_len, unsigned char direction, unsigned long long income_time) -{ - int i; - struct shaping_rule_info *s_rule_info; - - for (i = sf->anchor; i < sf->rule_num; i++) { - s_rule_info = &sf->matched_rule_infos[i]; - if (-1 == shaper_token_consume(sf, pkt_len, s_rule_info, s_rule_info->primary.priority)) { - sf->anchor = i; - if (0 == shaper_flow_push(sf, sp)) { - shaper_packet_enqueue(sf, raw_pkt, direction, income_time); - return SHAPING_HOLD; - } else { - return SHAPING_DROP; - } + if (priority == 0) {//highest priority, can't be blocked + return 0; + } + + arg = (struct shaping_async_cb_arg *)calloc(1, sizeof(struct shaping_async_cb_arg)); + arg->s_pf_info = profile; + arg->sf = sf; + arg->priority = priority; + + __atomic_add_fetch(&profile->async_queue_len_ref_count, 1, __ATOMIC_SEQ_CST); + __atomic_add_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST); + + swarmkv_async_command(db, shaper_queue_len_get_cb, arg, swarmkv_queue_len_get_cmd[priority], profile->id); + + if (__atomic_load_n(&profile->async_queue_len_ref_count, __ATOMIC_SEQ_CST) != 0) { + return 0; + } else { + if (profile->is_priority_blocked) { + return 1; + } else { + return 0; } } +} - return SHAPING_FORWARD; +static int shaper_token_consume(struct swarmkv *db, struct shaping_flow *sf, int req_token, + struct shaping_profile_info *profile, int profile_type, unsigned char direction) +{ + if (shaper_profile_is_priority_blocked(db, sf, profile)) { + return -1; + } else { + return shaper_token_get_from_profile(db, sf, profile, profile_type, req_token, direction); + } } -#endif int shaper_profile_get(struct shaping_rule_info *s_rule_info, int priority, struct shaping_profile_container pf_container[]) { diff --git a/shaping/test/gtest_shaper.cpp b/shaping/test/gtest_shaper.cpp index 9c70e84..3f18a99 100644 --- a/shaping/test/gtest_shaper.cpp +++ b/shaping/test/gtest_shaper.cpp @@ -211,6 +211,8 @@ static void shaping_global_stat_judge(char *file_line, int tx_pkts, int tx_bytes EXPECT_EQ(queueing_pkts, shaping_global_stat_field_get(metrics, "queueing_pkts")); EXPECT_EQ(queueing_bytes, shaping_global_stat_field_get(metrics, "queueing_bytes")); + cJSON_Delete(json); + return; } @@ -1245,6 +1247,93 @@ TEST(two_session_same_rule, udp_tx_in_order) profile_a: limit 1000 */ +TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order) +{ + struct stub_pkt_queue expec_tx_queue1; + struct stub_pkt_queue expec_tx_queue2; + struct stub_pkt_queue *actual_tx_queue; + struct shaping_ctx *ctx = NULL; + struct shaping_flow *sf1 = NULL; + struct shaping_flow *sf2 = NULL; + long long rule_ids[] = {1, 2}; + long long rule_id1[] = {1}; + long long rule_id2[] = {2}; + int profile_nums[] = {1, 1}; + int prioritys[] = {1, 2}; + int profile_id[][MAX_REF_PROFILE] = {{0}, {0}}; + + + TAILQ_INIT(&expec_tx_queue1); + TAILQ_INIT(&expec_tx_queue2); + stub_init(); + + ctx = shaping_engine_init(); + ASSERT_TRUE(ctx != NULL); + sf1 = shaping_flow_new(); + ASSERT_TRUE(sf1 != NULL); + sf2 = shaping_flow_new(); + ASSERT_TRUE(sf2 != NULL); + + stub_set_matched_shaping_rules(2, rule_ids, prioritys, profile_nums, profile_id); + + stub_set_token_bucket_avl_per_sec(0, 1000, SHAPING_DIR_OUT); + actual_tx_queue = stub_get_tx_queue(); + shaper_rules_update(&ctx->thread_ctx[0], sf1, rule_id1, 1); + shaper_rules_update(&ctx->thread_ctx[1], sf2, rule_id2, 1); + + /*******send packets***********/ + for (int i = 0; i < 100; i++) { + send_packets(&ctx->thread_ctx[0], sf1, 1, 100, SHAPING_DIR_OUT, &expec_tx_queue1, 1, 0); + send_packets(&ctx->thread_ctx[1], sf2, 1, 100, SHAPING_DIR_OUT, &expec_tx_queue2, 1, 0); + + if (i < 5) { + ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1)); + ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1)); + } + } + ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); + + for (int i = 0; i < 10; i++) {//线程1中的session优先级为2,被线程0中优先级为1的session阻断 + stub_refresh_token_bucket(0); + polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]); + ASSERT_EQ(-1, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1));//优先级低,不能发出报文 + } + + while (!TAILQ_EMPTY(&expec_tx_queue1)) { + stub_refresh_token_bucket(0); + polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); + + ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));//sf1 priority 1 + } + + while (!TAILQ_EMPTY(&expec_tx_queue2)) { + stub_refresh_token_bucket(0); + polling_entry(ctx->thread_ctx[1].sp, ctx->thread_ctx[1].stat, &ctx->thread_ctx[1]); + + ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1));//sf2 priority 2 + } + + ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); + + shaping_flow_free(sf1); + shaping_flow_free(sf2); + shaping_engine_destroy(ctx); + stub_clear_matched_shaping_rules(); +} + + + + +/*session1 match rule1; session2 match rule2 + rule1: + priority:1 + primary profile_a: (priority 1) + rule2: + priority:2 + primary profile_a: (priority 2) + +profile_a: limit 1000 +*/ TEST(two_session_diff_priority_same_profile, udp_random_tx_in_order) { struct stub_pkt_queue expec_tx_queue1; @@ -1313,14 +1402,14 @@ TEST(two_session_diff_priority_same_profile, udp_random_tx_in_order) stub_refresh_token_bucket(0); polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));//stream1 priority 1 + ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));//sf1 priority 1 } while (!TAILQ_EMPTY(&expec_tx_queue2)) { stub_refresh_token_bucket(0); polling_entry(ctx->thread_ctx[0].sp, ctx->thread_ctx[0].stat, &ctx->thread_ctx[0]); - ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1));//stream2 priority 2 + ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 1));//sf2 priority 2 } ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); @@ -1515,6 +1604,6 @@ TEST(statistics, udp_queueing_pkt) int main(int argc, char **argv) { testing::InitGoogleTest(&argc, argv); - //testing::GTEST_FLAG(filter) = "statistics.udp_queueing_pkt"; + //testing::GTEST_FLAG(filter) = "two_session_diff_priority_same_profile.udp_random_tx_in_order"; return RUN_ALL_TESTS(); }
\ No newline at end of file diff --git a/shaping/test/stub.cpp b/shaping/test/stub.cpp index ec7ce6b..f16d351 100644 --- a/shaping/test/stub.cpp +++ b/shaping/test/stub.cpp @@ -2,9 +2,11 @@ #include <MESA/swarmkv.h> #include <MESA/maat.h> +#include <cstdio> #include <marsio.h> #include <vector> #include <stdlib.h> +#include <stdarg.h> #include <unistd.h> #include <time.h> #include <sys/time.h> @@ -64,6 +66,7 @@ static int pf_async_times[MAX_STUB_PROFILE_NUM]; vector<struct stub_token_thread_arg> pf_async_thread[MAX_STUB_PROFILE_NUM]; struct stub_matched_rules matched_rules; struct shaping_profile pf_array[MAX_STUB_PROFILE_NUM]; +static int profile_priority_len[MAX_STUB_PROFILE_NUM][10] = {0}; static unsigned long long curr_time = 1; @@ -277,6 +280,75 @@ int swarmkv_options_set_log_level(struct swarmkv_options *opts, int loglevel) return 0; } +static void swarmkv_hincrby_cmd_func(char *cmd_str, swarmkv_on_reply_callback_t * cb, void *cb_arg) +{ + int profile_id; + int priority; + int value; + + sscanf(cmd_str, "HINCRBY tsg-shaping-%d priority-%d %d", &profile_id, &priority, &value); + profile_priority_len[profile_id][priority] += value; + + cb(NULL, cb_arg); + + return; +} + +static void swarmkv_hmget_cmd_func(char *cmd_str, swarmkv_on_reply_callback_t * cb, void *cb_arg) +{ + int profile_id; + int priority[10]; + int ret; + int priority_num; + struct swarmkv_reply *reply = (struct swarmkv_reply*)calloc(1, sizeof(struct swarmkv_reply)); + + ret = sscanf(cmd_str, "HMGET tsg-shaping-%d priority-%d priority-%d priority-%d priority-%d priority-%d priority-%d priority-%d priority-%d priority-%d", + &profile_id, &priority[0], &priority[1], &priority[2], &priority[3], &priority[4], &priority[5], &priority[6], &priority[7], &priority[8]); + priority_num = ret - 1; + + reply->type = SWARMKV_REPLY_ARRAY; + reply->n_element = priority_num; + reply->elements = (struct swarmkv_reply**)calloc(priority_num, sizeof(struct swarmkv_reply*)); + for (int i = 0; i < priority_num; i++) { + reply->elements[i] = (struct swarmkv_reply*)calloc(1, sizeof(struct swarmkv_reply)); + reply->elements[i]->type = SWARMKV_REPLY_INTEGER; + reply->elements[i]->integer = profile_priority_len[profile_id][priority[i]]; + } + + cb(reply, cb_arg); + + for(unsigned int i = 0; i < reply->n_element; i++) { + if (reply->elements[i]) { + free(reply->elements[i]); + } + } + free(reply->elements); + free(reply); + +} + +void swarmkv_async_command(struct swarmkv *db, swarmkv_on_reply_callback_t * cb, void *cb_arg, const char *format, ...) +{ + char *cmd_str = NULL; + char cmd_keyword[32] = {0}; + + va_list ap; + va_start(ap,format); + vasprintf(&cmd_str, format, ap); + va_end(ap); + + sscanf(cmd_str, "%31s %*s", cmd_keyword); + if (strcmp(cmd_keyword, "HINCRBY") == 0) { + swarmkv_hincrby_cmd_func(cmd_str, cb, cb_arg); + } else if (strcmp(cmd_keyword, "HMGET") == 0) { + swarmkv_hmget_cmd_func(cmd_str, cb, cb_arg); + } + + free(cmd_str); + + return; +} + void swarmkv_tconsume(struct swarmkv * db, const char * key, size_t keylen, long long tokens, swarmkv_on_reply_callback_t *cb, void *cb_arg) { int actual_tokens; |
