diff options
| author | root <[email protected]> | 2024-07-16 02:07:45 +0000 |
|---|---|---|
| committer | root <[email protected]> | 2024-07-16 02:07:45 +0000 |
| commit | f234a3888717e448c3b9821ea82d44d16dbaee78 (patch) | |
| tree | 999135d47a3ad4d0b91a184c7b4a2eb01be683d1 /shaping/test/dummy_swarmkv.cpp | |
| parent | 23ddf75eaad60fd42693dbf6b9558806247dc519 (diff) | |
add temp code
Diffstat (limited to 'shaping/test/dummy_swarmkv.cpp')
| -rw-r--r-- | shaping/test/dummy_swarmkv.cpp | 374 |
1 files changed, 374 insertions, 0 deletions
diff --git a/shaping/test/dummy_swarmkv.cpp b/shaping/test/dummy_swarmkv.cpp new file mode 100644 index 0000000..c86e524 --- /dev/null +++ b/shaping/test/dummy_swarmkv.cpp @@ -0,0 +1,374 @@ +#include <stdlib.h> +#include <stdarg.h> +#include <vector> +#include <MESA/swarmkv.h> + +#include "shaper.h" +#include "shaper_maat.h" +#include "stub.h" + +using namespace std; + +#define MAX_STUB_RULE_NUM 8 +#define MAX_STUB_PROFILE_NUM 8 + +#define DEFAULT_AVALIABLE_TOKEN_PER_SEC -1 + +struct stub_token_thread_arg { + int profile_id; + struct swarmkv_reply reply; + swarmkv_on_reply_callback_t *cb; + void *cb_arg; +}; + +struct stub_avaliable_token { + int in_limit_bandwidth; + int out_limit_bandwidth; + int bidirection_limit_bandwidth; +}; + +static int profile_priority_len[MAX_STUB_PROFILE_NUM][SHAPING_PRIORITY_NUM_MAX][SHAPING_DIR_MAX]; +static struct stub_avaliable_token pf_curr_avl_token[MAX_STUB_PROFILE_NUM]; +static int pf_async_times[MAX_STUB_PROFILE_NUM]; +vector<struct stub_token_thread_arg> pf_async_thread[MAX_STUB_PROFILE_NUM]; +struct shaping_profile pf_array[MAX_STUB_PROFILE_NUM]; + +void init_dummy_swarmkv() +{ + memset(&pf_array, 0, MAX_STUB_PROFILE_NUM * sizeof(struct shaping_profile)); + memset(&profile_priority_len, 0, MAX_STUB_PROFILE_NUM * SHAPING_PRIORITY_NUM_MAX * SHAPING_DIR_MAX * sizeof(int)); + + for (int i = 0; i < MAX_STUB_PROFILE_NUM; i++) { + pf_curr_avl_token[i].in_limit_bandwidth = DEFAULT_AVALIABLE_TOKEN_PER_SEC; + pf_curr_avl_token[i].out_limit_bandwidth = DEFAULT_AVALIABLE_TOKEN_PER_SEC; + pf_curr_avl_token[i].bidirection_limit_bandwidth = DEFAULT_AVALIABLE_TOKEN_PER_SEC; + pf_array[i].id = i; + pf_array[i].in_limit_bandwidth = DEFAULT_AVALIABLE_TOKEN_PER_SEC; + pf_array[i].out_limit_bandwidth = DEFAULT_AVALIABLE_TOKEN_PER_SEC; + pf_async_times[i] = 0; + memset(profile_priority_len[i], 0, 10 * sizeof(int)); + } +} + +void * stub_get_token_thread_func(void *data) +{ + struct stub_token_thread_arg *thread_arg; + + thread_arg = (struct stub_token_thread_arg*)data; + + thread_arg->cb(&thread_arg->reply, thread_arg->cb_arg); + + return NULL; +} + +void stub_set_token_bucket_avl_per_sec(int profile_id, unsigned int tokens, unsigned char direction, enum shaping_profile_limit_direction limit_direction) +{ + pf_array[profile_id].limit_direction = limit_direction; + + if (limit_direction == PROFILE_LIMIT_DIRECTION_BIDIRECTION) { + pf_array[profile_id].bidirection_limit_bandwidth = tokens * 8; + pf_curr_avl_token[profile_id].bidirection_limit_bandwidth = tokens * 8; + } else { + if (direction == SHAPING_DIR_IN) { + pf_array[profile_id].in_limit_bandwidth = tokens * 8; + pf_curr_avl_token[profile_id].in_limit_bandwidth = tokens * 8; + } else { + pf_array[profile_id].out_limit_bandwidth = tokens * 8; + pf_curr_avl_token[profile_id].out_limit_bandwidth = tokens * 8; + } + } + + return; +} + +void stub_refresh_token_bucket(int profile_id) +{ + pf_curr_avl_token[profile_id].bidirection_limit_bandwidth = pf_array[profile_id].bidirection_limit_bandwidth; + pf_curr_avl_token[profile_id].in_limit_bandwidth = pf_array[profile_id].in_limit_bandwidth; + pf_curr_avl_token[profile_id].out_limit_bandwidth = pf_array[profile_id].out_limit_bandwidth; + return; +} + +void stub_set_async_token_get_times(int profile_id, int times) +{ + pf_async_times[profile_id] = times; + + if (pf_async_times[profile_id] == 0) { + for (unsigned int i = 0; i < pf_async_thread[profile_id].size(); i++) { + stub_get_token_thread_func(&pf_async_thread[profile_id][i]); + } + pf_async_thread[profile_id].clear(); + } + + return; +} + +/**************stub of swarmkv*****************/ +void swarmkv_register_thread(struct swarmkv *db) +{ + return; +} + +struct swarmkv_options* swarmkv_options_new(void) +{ + return NULL; +} + +int swarmkv_options_set_logger(struct swarmkv_options *opts, void *logger) +{ + return 0; +} + +int swarmkv_options_set_bind_address(struct swarmkv_options *opts, const char* ip_addr) +{ + return 0; +} + +int swarmkv_options_set_cluster_port(struct swarmkv_options *opts, unsigned int cluster_port) +{ + return 0; +} + +int swarmkv_options_set_consul_port(struct swarmkv_options *opts, unsigned int consul_port) +{ + return 0; +} + +int swarmkv_options_set_consul_host(struct swarmkv_options *opts, const char* ip_addr) +{ + return 0; +} + +int swarmkv_options_set_cluster_announce_ip(struct swarmkv_options *opts, const char *ip_addr) +{ + return 0; +} + +int swarmkv_options_set_cluster_announce_port(struct swarmkv_options *opts, unsigned int cluster_announce_port) +{ + return 0; +} + +int swarmkv_options_set_health_check_port(struct swarmkv_options *opts, unsigned int health_check_port) +{ + return 0; +} + +int swarmkv_options_set_health_check_announce_port(struct swarmkv_options *opts, unsigned int health_check_announce_port) +{ + return 0; +} + +struct swarmkv *swarmkv_open(struct swarmkv_options *opts, const char * cluster_name, char **err) +{ + struct swarmkv *db; + + db = (struct swarmkv *)calloc(1, 1); + + return db; +} + +long long swarmkv_caller_get_pending_commands(struct swarmkv *db) +{ + return 0; +} + +int swarmkv_options_set_log_path(struct swarmkv_options *opts, const char *logpath) +{ + return 0; +} + +int swarmkv_options_set_log_level(struct swarmkv_options *opts, int loglevel) +{ + return 0; +} + +static void swarmkv_hincrby_cmd_func(char *cmd_str, swarmkv_on_reply_callback_t * cb, void *cb_arg) +{ + int profile_id; + int priority; + int value; + char direction[5] = {0}; + enum shaping_packet_dir dir; + struct swarmkv_reply *reply = (struct swarmkv_reply*)calloc(1, sizeof(struct swarmkv_reply)); + + sscanf(cmd_str, "HINCRBY tsg-shaping-%d priority-%d-%s %d", &profile_id, &priority, direction, &value); + if (strncmp(direction, "in", 2) == 0) { + dir = SHAPING_DIR_IN; + } else { + dir = SHAPING_DIR_OUT; + } + + profile_priority_len[profile_id][priority][dir] += value; + + reply->type = SWARMKV_REPLY_INTEGER; + cb(reply, cb_arg); + + free(reply); + + return; +} + +static void swarmkv_hmget_cmd_func(char *cmd_str, swarmkv_on_reply_callback_t * cb, void *cb_arg) +{ + int profile_id; + int priority[10]; + int ret; + int priority_num; + char direction[5] = {0}; + enum shaping_packet_dir dir; + struct swarmkv_reply *reply = (struct swarmkv_reply*)calloc(1, sizeof(struct swarmkv_reply)); + + ret = sscanf(cmd_str, "HMGET tsg-shaping-%d priority-%d-%s priority-%d-%*s priority-%d-%*s priority-%d-%*s priority-%d-%*s priority-%d-%*s priority-%d-%*s priority-%d-%*s priority-%d-%*s", + &profile_id, &priority[0], direction, &priority[1], &priority[2], &priority[3], &priority[4], &priority[5], &priority[6], &priority[7], &priority[8]); + priority_num = ret - 1; + + if (strncmp(direction, "in", 2) == 0) { + dir = SHAPING_DIR_IN; + } else { + dir = SHAPING_DIR_OUT; + } + + reply->type = SWARMKV_REPLY_ARRAY; + reply->n_element = priority_num; + reply->elements = (struct swarmkv_reply**)calloc(priority_num, sizeof(struct swarmkv_reply*)); + for (int i = 0; i < priority_num; i++) { + reply->elements[i] = (struct swarmkv_reply*)calloc(1, sizeof(struct swarmkv_reply)); + reply->elements[i]->type = SWARMKV_REPLY_STRING; + + char tmp_str[128] = {0}; + sprintf(tmp_str, "%d", profile_priority_len[profile_id][priority[i]][dir]); + reply->elements[i]->str = (char *)calloc(1, strlen(tmp_str)); + memcpy(reply->elements[i]->str, tmp_str, strlen(tmp_str)); + reply->elements[i]->len = strlen(tmp_str); + } + + cb(reply, cb_arg); + + for(unsigned int i = 0; i < reply->n_element; i++) { + if (reply->elements[i]) { + free(reply->elements[i]->str); + free(reply->elements[i]); + } + } + free(reply->elements); + free(reply); + +} + +void swarmkv_async_command(struct swarmkv *db, swarmkv_on_reply_callback_t * cb, void *cb_arg, const char *format, ...) +{ + char *cmd_str = NULL; + char cmd_keyword[32] = {0}; + + va_list ap; + va_start(ap,format); + vasprintf(&cmd_str, format, ap); + va_end(ap); + + sscanf(cmd_str, "%31s %*s", cmd_keyword); + if (strcmp(cmd_keyword, "HINCRBY") == 0) { + swarmkv_hincrby_cmd_func(cmd_str, cb, cb_arg); + } else if (strcmp(cmd_keyword, "HMGET") == 0) { + swarmkv_hmget_cmd_func(cmd_str, cb, cb_arg); + } + + free(cmd_str); + + return; +} + +void swarmkv_tconsume(struct swarmkv * db, const char * key, size_t keylen, long long tokens, swarmkv_on_reply_callback_t *cb, void *cb_arg) +{ + int actual_tokens; + struct stub_token_thread_arg thread_arg; + struct swarmkv_reply reply; + int profile_id; + char direction[16] = {0}; + + sscanf(key, "tsg-shaping-%d-%15s", &profile_id, direction); + + if (strncmp("bidirectional", direction, sizeof(direction)) == 0) { + if (pf_curr_avl_token[profile_id].bidirection_limit_bandwidth == DEFAULT_AVALIABLE_TOKEN_PER_SEC) { + actual_tokens = tokens; + } else { + actual_tokens = pf_curr_avl_token[profile_id].bidirection_limit_bandwidth >= tokens ? tokens : 0; + pf_curr_avl_token[profile_id].bidirection_limit_bandwidth -= actual_tokens; + } + } else if (strncmp("incoming", direction, sizeof(direction)) == 0) { + if (pf_curr_avl_token[profile_id].in_limit_bandwidth == DEFAULT_AVALIABLE_TOKEN_PER_SEC) { + actual_tokens = tokens; + } else { + actual_tokens = pf_curr_avl_token[profile_id].in_limit_bandwidth >= tokens ? tokens : 0; + pf_curr_avl_token[profile_id].in_limit_bandwidth -= actual_tokens; + } + } else { + if (pf_curr_avl_token[profile_id].out_limit_bandwidth == DEFAULT_AVALIABLE_TOKEN_PER_SEC) { + actual_tokens = tokens; + } else { + actual_tokens = pf_curr_avl_token[profile_id].out_limit_bandwidth >= tokens ? tokens : 0; + pf_curr_avl_token[profile_id].out_limit_bandwidth -= actual_tokens; + } + } + + if (pf_async_times[profile_id] == 0) { + for (unsigned int i = 0; i < pf_async_thread[profile_id].size(); i++) { + stub_get_token_thread_func(&pf_async_thread[profile_id][i]); + } + pf_async_thread[profile_id].clear(); + } + + reply.integer = actual_tokens; + reply.type = SWARMKV_REPLY_INTEGER; + + if (pf_async_times[profile_id] > 0) { + pf_async_times[profile_id]--; + + thread_arg.profile_id = profile_id; + thread_arg.reply = reply; + thread_arg.cb = cb; + thread_arg.cb_arg = cb_arg; + + pf_async_thread[profile_id].push_back(thread_arg); + + } else { + cb(&reply, cb_arg); + } + + return; +} + +void swarmkv_ftconsume(struct swarmkv * db, const char * key, size_t keylen, const char * member, size_t member_len, long long weight, long long tokens, swarmkv_on_reply_callback_t *cb, void *cb_arg) +{ + swarmkv_tconsume(db, key, keylen, tokens, cb, cb_arg); + return; +} + +void swarmkv_btconsume(struct swarmkv * db, const char * key, size_t keylen, const char * member, size_t member_len, long long tokens, swarmkv_on_reply_callback_t *cb, void *cb_arg) +{ + return; +} + +void swarmkv_close(struct swarmkv * db) +{ + if (db) { + free(db); + } + return; +} + +void swarmkv_caller_loop(struct swarmkv *db, int flags, struct timeval *tv) +{ + return; +} + +int swarmkv_options_set_caller_thread_number(struct swarmkv_options *opts, int nr_caller_threads) +{ + return 0; +} + +int swarmkv_options_set_worker_thread_number(struct swarmkv_options *opts, int nr_worker_threads) +{ + return 0; +} +/**********************************************/
\ No newline at end of file |
