diff options
| author | yangwei <[email protected]> | 2023-03-15 22:21:10 +0800 |
|---|---|---|
| committer | yangwei <[email protected]> | 2023-03-15 22:24:52 +0800 |
| commit | 2f759b2e0afa5ab298d2e5bd0df8439a3dba624a (patch) | |
| tree | 351feef03d718b118fee0b943a9213bdc1e980e9 | |
| parent | 2382859c91b5feb95cfceda1c4200ae91c071a8d (diff) | |
✨ feat(fair token bucket throttler): Added ftbt and test casesFeature-add-FTBT
| -rw-r--r-- | CRDT/CMakeLists.txt | 6 | ||||
| -rw-r--r-- | CRDT/crdt_gtest.cpp | 175 | ||||
| -rw-r--r-- | CRDT/ftb_throttler.c (renamed from CRDT/ftbf_throttle.c) | 90 | ||||
| -rw-r--r-- | CRDT/ftb_throttler.h | 46 | ||||
| -rw-r--r-- | CRDT/ftbf_throttle.h | 42 | ||||
| -rw-r--r-- | CRDT/ftbt_gtest.cpp (renamed from CRDT/ftbf_gtest.cpp) | 103 |
6 files changed, 186 insertions, 276 deletions
diff --git a/CRDT/CMakeLists.txt b/CRDT/CMakeLists.txt index 52d19dc..cbfc9aa 100644 --- a/CRDT/CMakeLists.txt +++ b/CRDT/CMakeLists.txt @@ -1,7 +1,7 @@ add_definitions(-D_GNU_SOURCE) add_definitions(-fPIC) -add_library(CRDT lww_register.c pn_counter.c or_map.c or_set.c oc_token_bucket.c ftbf_throttle.c) +add_library(CRDT lww_register.c pn_counter.c or_map.c or_set.c oc_token_bucket.c ftb_throttler.c) include_directories(${PROJECT_SOURCE_DIR}/deps/mpack ${PROJECT_SOURCE_DIR}/deps/uthash @@ -11,9 +11,9 @@ include_directories(${PROJECT_SOURCE_DIR}/deps/mpack add_executable(CRDT_gtest crdt_gtest.cpp ${PROJECT_SOURCE_DIR}/deps/mpack/mpack.c) target_link_libraries(CRDT_gtest CRDT gtest-static uuid) -add_executable(FTBF_gtest ftbf_gtest.cpp ${PROJECT_SOURCE_DIR}/deps/mpack/mpack.c +add_executable(FTBT_gtest ftbt_gtest.cpp ${PROJECT_SOURCE_DIR}/deps/mpack/mpack.c ${PROJECT_SOURCE_DIR}/deps/hashfun/hash_fun.c ${PROJECT_SOURCE_DIR}/deps/alias/ransampl.c ${PROJECT_SOURCE_DIR}/CRDT/util/fair_index.cpp ${PROJECT_SOURCE_DIR}/CRDT/util/flow_tokens.cpp) -target_link_libraries(FTBF_gtest CRDT gtest-static uuid)
\ No newline at end of file +target_link_libraries(FTBT_gtest CRDT gtest-static uuid)
\ No newline at end of file diff --git a/CRDT/crdt_gtest.cpp b/CRDT/crdt_gtest.cpp index f418585..3166d3c 100644 --- a/CRDT/crdt_gtest.cpp +++ b/CRDT/crdt_gtest.cpp @@ -35,87 +35,56 @@ enum traffic_type LIGHT_UNIFORM_TYPE, LIGHT_TWO_EIGHT_TYPE, HEAVY_TWO_EIGHT_TYPE, - HEAVY_UNIFORM_TYPE + HEAVY_UNIFORM_EXTREME_TYPE }; -enum configure_command -{ - ASYNC_DEFAULT, - ASYNC_ADD_NODE, - ASYNC_ADJUST_MERGE, - CONCURRNET_DEFAULT, - CONCURRENT_ADD_NODE, - CONCURRENT_ADJUST_MERGE -}; - -long long get_request_tokens(size_t index, enum traffic_type type, long long step_us, long long CIR, long long CBS, size_t replica_num, size_t new_replica_bindex) +long long get_request_tokens(int index, enum traffic_type type, long long step_us, long long CIR, long long CBS) { long long request_size; - double average_ratio2, average_ratio8, total_ratio2, total_ratio8; - long long standard = CIR*step_us/1000000; - long long sdf = floor((long double)standard * 0.05); //floating 5% - size_t occupy8_replica_num = floor(replica_num * 0.8); - long long rand_sdf = random()%sdf; - size_t scope_flag = FALSE; - if(random()%2) scope_flag = TRUE; - if(new_replica_bindex > 0 && index >= new_replica_bindex) - { - request_size = (long long)floor((long double)standard * 0.3); //request for new nodes - if (scope_flag) { - request_size += rand_sdf; - } else { - request_size -= rand_sdf; - } - return request_size; - } + long long standard = CIR * step_us / 1000000; + long long sd10 = floor((long double)standard * 0.1); + int eight_replica_num = floor(REPLICA_NUMBER * 0.8); + long long rand_sd10 = random() % sd10; + int scope_flag = FALSE; + if (random() % 2) scope_flag = TRUE; switch (type) { case LIGHT_UNIFORM_TYPE: request_size = (long long)floor((long double)standard * 0.5); if (scope_flag) { - request_size += rand_sdf; + request_size += rand_sd10; } else { - request_size -= rand_sdf; + request_size -= rand_sd10; } break; case LIGHT_TWO_EIGHT_TYPE: - //average 0.67*CIR per node - total_ratio2 = (0.67*replica_num) / 5; - total_ratio8 = (0.67*replica_num) - total_ratio2; - average_ratio2 = total_ratio2 / occupy8_replica_num; - average_ratio8 = total_ratio8 / (replica_num-occupy8_replica_num); - if (index < occupy8_replica_num && scope_flag) { - request_size = (long long)floor((long double)standard * average_ratio2) + rand_sdf; - } else if (index < occupy8_replica_num && !scope_flag) { - request_size = (long long)floor((long double)standard * average_ratio2) - rand_sdf; - } else if (index >= occupy8_replica_num && scope_flag) { - request_size = (long long)floor((long double)standard * average_ratio8) + rand_sdf; + if (index < eight_replica_num && scope_flag) { + request_size = (long long)floor((long double)standard * 0.2) + rand_sd10; + } else if (index < eight_replica_num && !scope_flag) { + request_size = (long long)floor((long double)standard * 0.2) - rand_sd10; + } else if (index >= eight_replica_num && scope_flag) { + request_size = (long long)floor((long double)standard * 1.6) + rand_sd10; } else { - request_size = (long long)floor((long double)standard * average_ratio8) - rand_sdf; + request_size = (long long)floor((long double)standard * 1.6) - rand_sd10; } break; case HEAVY_TWO_EIGHT_TYPE: - //average 2*CIR per node - total_ratio2 = (2.0*replica_num) / 5; - total_ratio8 = (2.0*replica_num) - total_ratio2; - average_ratio2 = total_ratio2 / occupy8_replica_num; - average_ratio8 = total_ratio8 / (replica_num-occupy8_replica_num); - if (index < occupy8_replica_num && scope_flag) { - request_size = (long long)floor((long double)standard * average_ratio2) + rand_sdf; - } else if (index < occupy8_replica_num && !scope_flag) { - request_size = (long long)floor((long double)standard * average_ratio2) - rand_sdf; - } else if (index >= occupy8_replica_num && scope_flag) { - request_size = (long long)floor((long double)standard * average_ratio8) + rand_sdf; + if (index < eight_replica_num && scope_flag) { + request_size = (long long)floor((long double)standard * 0.6) + rand_sd10; + } else if (index < eight_replica_num && !scope_flag) { + request_size = (long long)floor((long double)standard * 0.6) - rand_sd10; + } else if (index >= eight_replica_num && scope_flag) { + request_size = (long long)floor((long double)standard * 4.8) + rand_sd10; } else { - request_size = (long long)floor((long double)standard * average_ratio8) - rand_sdf; + request_size = (long long)floor((long double)standard * 4.8) - rand_sd10; } break; - case HEAVY_UNIFORM_TYPE: + case HEAVY_UNIFORM_EXTREME_TYPE: request_size = (long long)floor((long double)standard * 2); if (scope_flag) { - request_size += rand_sdf; + request_size += rand_sd10; } else { - request_size -= rand_sdf; + request_size -= rand_sd10; } break; default: @@ -124,18 +93,16 @@ long long get_request_tokens(size_t index, enum traffic_type type, long long ste return request_size; } -void traffic_distribution(traffic_type type, enum configure_command configure=ASYNC_DEFAULT) +void traffic_distribution(traffic_type type) { - size_t init_replica_num = REPLICA_NUMBER, new_replica_bindex = 0; - size_t replica_num = init_replica_num; - struct OC_token_bucket *buckets[100]; + struct OC_token_bucket *buckets[REPLICA_NUMBER]; size_t i = 0, j = 0; long long CIR = 1*1024*1024; // 1Mps long long CBS = 2*1024*1024; uuid_t uuid; struct timeval start; gettimeofday(&start, NULL); - for (i = 0; i < replica_num; i++) + for (i = 0; i < REPLICA_NUMBER; i++) { uuid_generate(uuid); buckets[i] = OC_token_bucket_new(uuid, start, CIR, CBS); @@ -148,56 +115,27 @@ void traffic_distribution(traffic_type type, enum configure_command configure=AS memcpy(&now, &start, sizeof(now)); step.tv_sec = 0; step.tv_usec = (suseconds_t)step_us; - int concurrent_flag = FALSE; - if(configure == CONCURRNET_DEFAULT || configure == CONCURRENT_ADD_NODE || configure == CONCURRENT_ADJUST_MERGE) - { - concurrent_flag = TRUE; - } for (i = 0; (long long)i < mimic_duration_us / step_us; i++) { + j = i % 3; // sequence selection timeradd(&now, &step, &now); - for(j = 0; j < replica_num; j++) - { - if(!concurrent_flag) - { - j = i % replica_num; // sequence selection - tokens = get_request_tokens(j, type, step_us, CIR, CBS, init_replica_num, new_replica_bindex); - }else - { - tokens = get_request_tokens(j, type, step_us, CIR, CBS, init_replica_num, new_replica_bindex); - if(new_replica_bindex <= 0 || j < new_replica_bindex) - tokens = round((long double)tokens/init_replica_num); - } - flexible_tokens = OC_token_bucket_control(buckets[j], now, OCTB_CMD_CONSUME_FLEXIBLE, tokens); - requested += tokens; - consumed += flexible_tokens; - if(configure == ASYNC_ADJUST_MERGE || configure == CONCURRENT_ADJUST_MERGE) - { - if(i % 10000 == 0) - { - OC_token_bucket_sync(buckets, replica_num); - } - }else - { - OC_token_bucket_sync(buckets, replica_num); - } - if(!concurrent_flag) break; - } - if((configure == ASYNC_ADD_NODE || configure == CONCURRENT_ADD_NODE) && ((i == 10000) || (i == 50000))) //two replica added + tokens = get_request_tokens(j, type, step_us, CIR, CBS); + flexible_tokens = OC_token_bucket_control(buckets[j], now, OCTB_CMD_CONSUME_FLEXIBLE, tokens); + requested += tokens; + consumed += flexible_tokens; + if(i%100==0) { - uuid_generate(uuid); - buckets[replica_num] = OC_token_bucket_new((const char *)uuid, sizeof(uuid), now, CIR, CBS); - new_replica_bindex = replica_num; - replica_num++; + OC_token_bucket_sync(buckets, REPLICA_NUMBER); } } upper_limit = CBS + CIR * timeval_delta_ms(start, now) / 1000; refilled = OC_token_bucket_control(buckets[0], now, OCTB_CMD_READ_REFILLED, 0); EXPECT_LE(consumed, requested); double accuracy = (double)consumed / MIN(refilled, requested); + EXPECT_NEAR(accuracy, 1, 0.01); printf("accuracy:%f, upper_limit:%lld, refilled:%lld, requested:%lld, consumed:%lld\n", accuracy, upper_limit, refilled, requested, consumed); - for(i = 0; i < replica_num; i++) + for(i = 0; i < REPLICA_NUMBER; i++) { OC_token_bucket_free(buckets[i]); } @@ -205,42 +143,10 @@ void traffic_distribution(traffic_type type, enum configure_command configure=AS TEST(OCTokenBucket, TrafficTypeConsumer) { - printf("[ASYNC_DEFAULT]\n"); traffic_distribution(LIGHT_UNIFORM_TYPE); traffic_distribution(LIGHT_TWO_EIGHT_TYPE); traffic_distribution(HEAVY_TWO_EIGHT_TYPE); - traffic_distribution(HEAVY_UNIFORM_TYPE); - - //异步模式下的追加节点,UNIFORM类型请求量降低,主要是因为总请求量一定,新加节点请求量低于平均水平 - printf("[ASYNC_ADD_NODE]\n"); - traffic_distribution(LIGHT_UNIFORM_TYPE, ASYNC_ADD_NODE); - traffic_distribution(LIGHT_TWO_EIGHT_TYPE, ASYNC_ADD_NODE); - traffic_distribution(HEAVY_TWO_EIGHT_TYPE, ASYNC_ADD_NODE); - traffic_distribution(HEAVY_UNIFORM_TYPE, ASYNC_ADD_NODE); - - printf("[ASYNC_ADJUST_MERGE]\n"); - traffic_distribution(LIGHT_UNIFORM_TYPE, ASYNC_ADJUST_MERGE); - traffic_distribution(LIGHT_TWO_EIGHT_TYPE, ASYNC_ADJUST_MERGE); - traffic_distribution(HEAVY_TWO_EIGHT_TYPE, ASYNC_ADJUST_MERGE); - traffic_distribution(HEAVY_UNIFORM_TYPE, ASYNC_ADJUST_MERGE); - - printf("[CONCURRENT_DEFAULT]\n"); - traffic_distribution(LIGHT_UNIFORM_TYPE, CONCURRNET_DEFAULT); - traffic_distribution(LIGHT_TWO_EIGHT_TYPE, CONCURRNET_DEFAULT); - traffic_distribution(HEAVY_TWO_EIGHT_TYPE, CONCURRNET_DEFAULT); - traffic_distribution(HEAVY_UNIFORM_TYPE, CONCURRNET_DEFAULT); - - printf("[CONCURRENT_ADD_NODE]\n"); - traffic_distribution(LIGHT_UNIFORM_TYPE, CONCURRENT_ADD_NODE); - traffic_distribution(LIGHT_TWO_EIGHT_TYPE, CONCURRENT_ADD_NODE); - traffic_distribution(HEAVY_TWO_EIGHT_TYPE, CONCURRENT_ADD_NODE); - traffic_distribution(HEAVY_UNIFORM_TYPE, CONCURRENT_ADD_NODE); - - printf("[CONCURRENT_ADJUST_MERGE]\n"); - traffic_distribution(LIGHT_UNIFORM_TYPE, CONCURRENT_ADJUST_MERGE); - traffic_distribution(LIGHT_TWO_EIGHT_TYPE, CONCURRENT_ADJUST_MERGE); - traffic_distribution(HEAVY_TWO_EIGHT_TYPE, CONCURRENT_ADJUST_MERGE); - traffic_distribution(HEAVY_UNIFORM_TYPE, CONCURRENT_ADJUST_MERGE); + traffic_distribution(HEAVY_UNIFORM_EXTREME_TYPE); } TEST(OCTokenBucket, Basic) @@ -460,7 +366,6 @@ TEST(OCTokenBucket, LightConsumer) } } } - TEST(OCTokenBucket, ConcurrentHeavyConsumer) { struct OC_token_bucket *buckets[REPLICA_NUMBER]; diff --git a/CRDT/ftbf_throttle.c b/CRDT/ftb_throttler.c index 25374c6..0c6d0ae 100644 --- a/CRDT/ftbf_throttle.c +++ b/CRDT/ftb_throttler.c @@ -1,4 +1,4 @@ -#include "ftbf_throttle.h" +#include "ftb_throttler.h" #include "crdt_utils.h" #include "hash_fun.h" #include "pn_counter.h" @@ -12,7 +12,7 @@ #define MAX_SKETCH_WIDTH 1024 #define MAX_QUEUE_DEPTH 2000 -struct ftbf_class_id +struct ftbt_class_id { long long class_hash[MAX_SKETCH_DEPTH]; unsigned char weight; @@ -20,7 +20,7 @@ struct ftbf_class_id struct active_class_queue { - struct ftbf_class_id queue[MAX_QUEUE_DEPTH]; + struct ftbt_class_id queue[MAX_QUEUE_DEPTH]; long long head; long long tail; long long len; @@ -34,16 +34,16 @@ struct count_min_sketch size_t width; }; -struct ftbf_configuration +struct ftbt_configuration { long long CIR; //Committed Information Rate long long CBS; //Committed Burst Size long long refill_duration_ms; struct timeval write_timestamp; }; -struct ftbf_throttle +struct ftb_throttler { - struct ftbf_configuration cfg; + struct ftbt_configuration cfg; long long refilled; struct timeval refill_timestamp; struct PN_counter *consumed; @@ -51,7 +51,7 @@ struct ftbf_throttle struct active_class_queue active_q; }; -static void ftbf_fill_class_id(struct ftbf_class_id *cid, struct ftbf_throttle *t, const char *req_class_id, unsigned char weight) +static void ftbt_fill_class_id(struct ftbt_class_id *cid, struct ftb_throttler *t, const char *req_class_id, unsigned char weight) { for(size_t i = 0; i < t->sketch.depth; ++i) { @@ -60,7 +60,7 @@ static void ftbf_fill_class_id(struct ftbf_class_id *cid, struct ftbf_throttle * cid->weight = weight; } -static void ftbf_update_sketch(struct count_min_sketch *sketch, long long *hash, long long count) +static void ftbt_update_sketch(struct count_min_sketch *sketch, long long *hash, long long count) { size_t i = 0; long long hashval = 0; @@ -70,7 +70,7 @@ static void ftbf_update_sketch(struct count_min_sketch *sketch, long long *hash, sketch->occupied_token[i][hashval] += count; } } -static long long ftbf_estimate_sketch(struct count_min_sketch sketch, long long *hash) +static long long ftbt_estimate_sketch(struct count_min_sketch sketch, long long *hash) { size_t i = 0; long long minval = LLONG_MAX; @@ -83,7 +83,7 @@ static long long ftbf_estimate_sketch(struct count_min_sketch sketch, long long return minval; } -static int ftbf_aq_enqueue(struct ftbf_class_id *cid, struct ftbf_throttle *t) +static int ftbt_aq_enqueue(struct ftbt_class_id *cid, struct ftb_throttler *t) { struct active_class_queue *aq = &t->active_q; if(aq->len >= MAX_QUEUE_DEPTH) @@ -91,14 +91,14 @@ static int ftbf_aq_enqueue(struct ftbf_class_id *cid, struct ftbf_throttle *t) printf("Unexpected: active flow queue full.\n"); return FALSE; } - memcpy(&(aq->queue[aq->tail]), cid, sizeof(struct ftbf_class_id)); + memcpy(&(aq->queue[aq->tail]), cid, sizeof(struct ftbt_class_id)); aq->tail = (aq->tail + 1) % MAX_QUEUE_DEPTH; aq->len++; return TRUE; } //update weight -static void ftbf_operate_weight(struct ftbf_class_id cid, struct ftbf_throttle *t) +static void ftbt_operate_weight(struct ftbt_class_id cid, struct ftb_throttler *t) { struct active_class_queue *aq = &t->active_q; size_t i,j; @@ -115,7 +115,7 @@ static void ftbf_operate_weight(struct ftbf_class_id cid, struct ftbf_throttle * aq->queue[i].weight = cid.weight; } -static void ftbf_aq_dequeue(struct ftbf_throttle *t, long long available) +static void ftbt_aq_dequeue(struct ftb_throttler *t, long long available) { struct active_class_queue *aq = &t->active_q; long long occupied_tokens = 0; @@ -123,16 +123,16 @@ static void ftbf_aq_dequeue(struct ftbf_throttle *t, long long available) { while (aq->len > 0) { - struct ftbf_class_id *cid = &aq->queue[aq->head]; - occupied_tokens = ftbf_estimate_sketch(t->sketch, cid->class_hash); + struct ftbt_class_id *cid = &aq->queue[aq->head]; + occupied_tokens = ftbt_estimate_sketch(t->sketch, cid->class_hash); unsigned char weight = cid->weight; long long alloc_tokens_weighted = aq->alloc_tokens * weight; long long consume = occupied_tokens > alloc_tokens_weighted ? alloc_tokens_weighted : occupied_tokens; if(available - consume >= 0) { - ftbf_update_sketch(&t->sketch, cid->class_hash, -consume); + ftbt_update_sketch(&t->sketch, cid->class_hash, -consume); if(occupied_tokens > alloc_tokens_weighted) - ftbf_aq_enqueue(cid, t); + ftbt_aq_enqueue(cid, t); PN_counter_incrby(t->consumed, consume); available -= consume; }else{ @@ -144,9 +144,9 @@ static void ftbf_aq_dequeue(struct ftbf_throttle *t, long long available) } } -struct ftbf_throttle *ftbf_throttle_new(uuid_t my_id, struct timeval now, long long CIR, long long CBS, long long alloc_tokens) +struct ftb_throttler *ftb_throttler_new(uuid_t my_id, struct timeval now, long long CIR, long long CBS, long long alloc_tokens) { - struct ftbf_throttle *t=ALLOC(struct ftbf_throttle, 1); + struct ftb_throttler *t=ALLOC(struct ftb_throttler, 1); t->consumed=PN_counter_new(my_id); gettimeofday(&t->cfg.write_timestamp, NULL); memcpy(&t->cfg.write_timestamp, &now, sizeof(t->cfg.write_timestamp)); @@ -166,7 +166,7 @@ struct ftbf_throttle *ftbf_throttle_new(uuid_t my_id, struct timeval now, long l return t; } -void ftbf_throttle_free(struct ftbf_throttle *t) +void ftb_throttler_free(struct ftb_throttler *t) { PN_counter_free(t->consumed); for(size_t i = 0; i < t->sketch.depth; ++i){ @@ -175,7 +175,7 @@ void ftbf_throttle_free(struct ftbf_throttle *t) free(t); } -void ftbf_throttle_configure(struct ftbf_throttle *t, struct timeval now, long long CIR, long long CBS, long long refill_duration_ms) +void ftb_throttler_configure(struct ftb_throttler *t, struct timeval now, long long CIR, long long CBS, long long refill_duration_ms) { memcpy(&t->cfg.write_timestamp, &now, sizeof(t->cfg.write_timestamp)); if(CIR>=0) t->cfg.CIR=CIR; @@ -183,7 +183,7 @@ void ftbf_throttle_configure(struct ftbf_throttle *t, struct timeval now, long l if(refill_duration_ms>0) t->cfg.refill_duration_ms=refill_duration_ms; } -long long ftbf_throttle_control(struct ftbf_throttle *t, struct timeval now, enum ftbf_throttle_command cmd, long long tokens, const char *req_class_id, unsigned char weight) +long long ftb_throttler_control(struct ftb_throttler *t, struct timeval now, enum ftb_throttler_command cmd, long long tokens, const char *req_class_id, unsigned char weight) { long long delta_time_ms=timeval_delta_ms(t->refill_timestamp, now); long long consumed=PN_counter_get(t->consumed); @@ -193,7 +193,7 @@ long long ftbf_throttle_control(struct ftbf_throttle *t, struct timeval now, enu assert(consumed>=0); int refill_flag=0; long long global_available, local_available; - struct ftbf_class_id cid; + struct ftbt_class_id cid; long long occupied_tokens, token_threshold; int rtn = 0; @@ -202,7 +202,7 @@ long long ftbf_throttle_control(struct ftbf_throttle *t, struct timeval now, enu local_available = global_available; if(delta_time_ms >= t->cfg.refill_duration_ms) { - ftbf_aq_dequeue(t, local_available); + ftbt_aq_dequeue(t, local_available); } consumed = PN_counter_get(t->consumed); refilled = t->refilled; @@ -233,28 +233,28 @@ long long ftbf_throttle_control(struct ftbf_throttle *t, struct timeval now, enu global_available=MAX(refilled-consumed, 0); local_available=global_available; - ftbf_fill_class_id(&cid, t, req_class_id, weight); - ftbf_operate_weight(cid, t); - occupied_tokens = ftbf_estimate_sketch(t->sketch, cid.class_hash); + ftbt_fill_class_id(&cid, t, req_class_id, weight); + ftbt_operate_weight(cid, t); + occupied_tokens = ftbt_estimate_sketch(t->sketch, cid.class_hash); token_threshold = local_available*weight; switch(cmd) { - case FTBF_CMD_CONSUME_FLEXIBLE: + case FTBT_CMD_CONSUME_FLEXIBLE: if (occupied_tokens <= token_threshold) { if (occupied_tokens == 0) - ftbf_aq_enqueue(&cid, t); - ftbf_update_sketch(&t->sketch, cid.class_hash, tokens); + ftbt_aq_enqueue(&cid, t); + ftbt_update_sketch(&t->sketch, cid.class_hash, tokens); rtn++; } break; - case FTBF_CMD_CONSUME_NORMAL: + case FTBT_CMD_CONSUME_NORMAL: if (occupied_tokens <= token_threshold && local_available >= tokens) { if (occupied_tokens == 0) - ftbf_aq_enqueue(&cid, t); - ftbf_update_sketch(&t->sketch, cid.class_hash, tokens); + ftbt_aq_enqueue(&cid, t); + ftbt_update_sketch(&t->sketch, cid.class_hash, tokens); rtn++; } break; @@ -270,17 +270,17 @@ long long ftbf_throttle_control(struct ftbf_throttle *t, struct timeval now, enu return rtn; } -long long ftbf_throttle_read_refilled(struct ftbf_throttle *t) +long long ftbt_throttler_read_refilled(struct ftb_throttler *t) { return t->refilled; } -long long ftbf_throttle_read_available(struct ftbf_throttle *t) +long long ftbt_throttler_read_available(struct ftb_throttler *t) { long long consumed=PN_counter_get(t->consumed); long long global_available=MAX(t->refilled-consumed, 0); return global_available; } -void ftbf_throttle_info(struct ftbf_throttle *t, struct ftbf_throttle_info *info) +void ftb_throttler_info(struct ftb_throttler *t, struct ftb_throttler_info *info) { struct timeval now; gettimeofday(&now, NULL); @@ -290,11 +290,11 @@ void ftbf_throttle_info(struct ftbf_throttle *t, struct ftbf_throttle_info *info info->refill_duration_ms=t->cfg.refill_duration_ms; info->consumed=PN_counter_get(t->consumed); info->refilled=t->refilled; - info->available=ftbf_throttle_read_available(t); + info->available=ftbt_throttler_read_available(t); return; } -void ftbf_throttle_serialize(struct ftbf_throttle *t, char **blob, size_t *blob_sz) +void ftb_throttler_serialize(struct ftb_throttler *t, char **blob, size_t *blob_sz) { mpack_writer_t writer; char *mpack_buff=NULL; @@ -324,9 +324,9 @@ void ftbf_throttle_serialize(struct ftbf_throttle *t, char **blob, size_t *blob_ *blob_sz=mpack_sz; return; } -struct ftbf_throttle *ftbf_throttle_deserialize(const char *blob, size_t blob_sz) +struct ftb_throttler *ftb_throttler_deserialize(const char *blob, size_t blob_sz) { - struct ftbf_throttle *t=ALLOC(struct ftbf_throttle, 1); + struct ftb_throttler *t=ALLOC(struct ftb_throttler, 1); mpack_tree_t tree; mpack_tree_init_data(&tree, blob, blob_sz); mpack_tree_parse(&tree); @@ -357,7 +357,7 @@ struct ftbf_throttle *ftbf_throttle_deserialize(const char *blob, size_t blob_sz } return t; } -void ftbf_throttle_merge(struct ftbf_throttle *src, struct ftbf_throttle *dst) +void ftb_throttler_merge(struct ftb_throttler *src, struct ftb_throttler *dst) { PN_counter_merge(src->consumed, dst->consumed); if(src->refilled > dst->refilled) @@ -371,10 +371,10 @@ void ftbf_throttle_merge(struct ftbf_throttle *src, struct ftbf_throttle *dst) src->refill_timestamp = dst->refill_timestamp; } } -void ftbf_throttle_merge_blob(struct ftbf_throttle *t, const char *blob, size_t blob_sz) +void ftb_throttler_merge_blob(struct ftb_throttler *t, const char *blob, size_t blob_sz) { - struct ftbf_throttle *to_merge=ftbf_throttle_deserialize(blob, blob_sz); - ftbf_throttle_merge(t, to_merge); - ftbf_throttle_free(to_merge); + struct ftb_throttler *to_merge=ftb_throttler_deserialize(blob, blob_sz); + ftb_throttler_merge(t, to_merge); + ftb_throttler_free(to_merge); return; } diff --git a/CRDT/ftb_throttler.h b/CRDT/ftb_throttler.h new file mode 100644 index 0000000..7f20979 --- /dev/null +++ b/CRDT/ftb_throttler.h @@ -0,0 +1,46 @@ +/* +* Fair Token Bucket Throttler CRDT. +* 2023-3-15 +*/ +#pragma once +#include <stddef.h> +#include <sys/time.h> +#include <uuid/uuid.h> +#ifdef __cplusplus +extern "C" +{ +#endif +struct ftb_throttler; +struct ftb_throttler *ftb_throttler_new(uuid_t my_id, struct timeval now, long long CIR, long long CBS, long long alloc_tokens); + +void ftb_throttler_configure(struct ftb_throttler *t, struct timeval now, long long CIR, long long CBS, long long refill_duration_ms); +struct ftb_throttler_info +{ + long long CIR; + long long CBS; + long long refill_duration_ms; + long long consumed; + long long refilled; + long long available; + long long number_of_consumers; +}; +void ftb_throttler_info(struct ftb_throttler *t, struct ftb_throttler_info *info); +void ftb_throttler_free(struct ftb_throttler *t); + +enum ftb_throttler_command +{ + FTBT_CMD_CONSUME_NORMAL, + FTBT_CMD_CONSUME_FLEXIBLE, +}; +long long ftb_throttler_control(struct ftb_throttler *t, struct timeval now, enum ftb_throttler_command cmd, long long tokens, const char *req_class_id, unsigned char weight); + +void ftb_throttler_serialize(struct ftb_throttler *t, char **blob, size_t *blob_sz); +struct ftb_throttler *ftb_throttler_deserialize(const char *blob, size_t blob_sz); + +void ftb_throttler_merge(struct ftb_throttler *src, struct ftb_throttler *dst); +void ftb_throttler_merge_blob(struct ftb_throttler *t, const char *blob, size_t blob_sz); + +#ifdef __cplusplus +} +#endif + diff --git a/CRDT/ftbf_throttle.h b/CRDT/ftbf_throttle.h deleted file mode 100644 index e378397..0000000 --- a/CRDT/ftbf_throttle.h +++ /dev/null @@ -1,42 +0,0 @@ -#pragma once -#include <stddef.h> -#include <sys/time.h> -#include <uuid/uuid.h> -#ifdef __cplusplus -extern "C" -{ -#endif -struct ftbf_throttle; -struct ftbf_throttle *ftbf_throttle_new(uuid_t my_id, struct timeval now, long long CIR, long long CBS, long long alloc_tokens); - -void ftbf_throttle_configure(struct ftbf_throttle *t, struct timeval now, long long CIR, long long CBS, long long refill_duration_ms); -struct ftbf_throttle_info -{ - long long CIR; - long long CBS; - long long refill_duration_ms; - long long consumed; - long long refilled; - long long available; - long long number_of_consumers; -}; -void ftbf_throttle_info(struct ftbf_throttle *t, struct ftbf_throttle_info *info); -void ftbf_throttle_free(struct ftbf_throttle *t); - -enum ftbf_throttle_command -{ - FTBF_CMD_CONSUME_NORMAL, - FTBF_CMD_CONSUME_FLEXIBLE, -}; -long long ftbf_throttle_control(struct ftbf_throttle *t, struct timeval now, enum ftbf_throttle_command cmd, long long tokens, const char *req_class_id, unsigned char weight); - -void ftbf_throttle_serialize(struct ftbf_throttle *t, char **blob, size_t *blob_sz); -struct ftbf_throttle *ftbf_throttle_deserialize(const char *blob, size_t blob_sz); - -void ftbf_throttle_merge(struct ftbf_throttle *src, struct ftbf_throttle *dst); -void ftbf_throttle_merge_blob(struct ftbf_throttle *t, const char *blob, size_t blob_sz); - -#ifdef __cplusplus -} -#endif - diff --git a/CRDT/ftbf_gtest.cpp b/CRDT/ftbt_gtest.cpp index 1e61212..4c536bd 100644 --- a/CRDT/ftbf_gtest.cpp +++ b/CRDT/ftbt_gtest.cpp @@ -1,4 +1,4 @@ -#include "ftbf_throttle.h" +#include "ftb_throttler.h" #include "crdt_utils.h" #include "ransampl.h" #include <cstddef> @@ -10,63 +10,63 @@ #include "util/flow_tokens.h" #include "util/fair_index.h" -void ftbf_throttle_sync(struct ftbf_throttle *list[], size_t n) +void ftbt_throttle_sync(struct ftb_throttler *list[], size_t n) { char *blob=NULL; size_t blob_sz=0; for(size_t i=0; i<n; i++) { - ftbf_throttle_serialize(list[i], &blob, &blob_sz); + ftb_throttler_serialize(list[i], &blob, &blob_sz); for(size_t j=0; j<n; j++) { if(j==i) continue; - ftbf_throttle_merge_blob(list[j], blob, blob_sz); + ftb_throttler_merge_blob(list[j], blob, blob_sz); } free(blob); blob=NULL; } } -TEST(FTBF, Basic) +TEST(FTBT, Basic) { uuid_t uuid; uuid_generate(uuid); - struct ftbf_throttle *bucket=NULL; - struct ftbf_throttle_info *info = ALLOC(struct ftbf_throttle_info, 1); + struct ftb_throttler *bucket=NULL; + struct ftb_throttler_info *info = ALLOC(struct ftb_throttler_info, 1); long long CIR=100; long long CBS=200; struct timeval now; gettimeofday(&now, NULL); - bucket=ftbf_throttle_new(uuid, now, CIR, CBS, CIR/10); + bucket=ftb_throttler_new(uuid, now, CIR, CBS, CIR/10); long long rtn = 0; - rtn=ftbf_throttle_control(bucket, now, FTBF_CMD_CONSUME_NORMAL, 60, "1", 1); - ftbf_throttle_info(bucket, info); + rtn=ftb_throttler_control(bucket, now, FTBT_CMD_CONSUME_NORMAL, 60, "1", 1); + ftb_throttler_info(bucket, info); EXPECT_EQ(rtn, 1); EXPECT_EQ(info->available, 200); - rtn=ftbf_throttle_control(bucket, now, FTBF_CMD_CONSUME_FLEXIBLE, 200, "1", 1); - ftbf_throttle_info(bucket, info); + rtn=ftb_throttler_control(bucket, now, FTBT_CMD_CONSUME_FLEXIBLE, 200, "1", 1); + ftb_throttler_info(bucket, info); EXPECT_EQ(rtn, 1); EXPECT_EQ(info->available, 200); - rtn=ftbf_throttle_control(bucket, now, FTBF_CMD_CONSUME_NORMAL, 60, "1", 1); - ftbf_throttle_info(bucket, info); + rtn=ftb_throttler_control(bucket, now, FTBT_CMD_CONSUME_NORMAL, 60, "1", 1); + ftb_throttler_info(bucket, info); EXPECT_EQ(rtn, 0); EXPECT_EQ(info->available, 200); - ftbf_throttle_free(bucket); + ftb_throttler_free(bucket); } -TEST(FTBF, BasicWeighted) +TEST(FTBT, BasicWeighted) { uuid_t uuid; uuid_generate(uuid); - struct ftbf_throttle *bucket=NULL; - struct ftbf_throttle_info *info = ALLOC(struct ftbf_throttle_info, 1); + struct ftb_throttler *bucket=NULL; + struct ftb_throttler_info *info = ALLOC(struct ftb_throttler_info, 1); long long CIR=100*1000; //100*1000 bps long long CBS=200; struct timeval now, step; @@ -75,34 +75,34 @@ TEST(FTBF, BasicWeighted) step.tv_sec=0; step.tv_usec=(suseconds_t)1000; - bucket=ftbf_throttle_new(uuid, now, CIR, CBS, CIR/10); + bucket=ftb_throttler_new(uuid, now, CIR, CBS, CIR/10); long long rtn = 0; - rtn=ftbf_throttle_control(bucket, now, FTBF_CMD_CONSUME_NORMAL, 200, "1", 1); //active list : 200 - ftbf_throttle_info(bucket, info); + rtn=ftb_throttler_control(bucket, now, FTBT_CMD_CONSUME_NORMAL, 200, "1", 1); //active list : 200 + ftb_throttler_info(bucket, info); EXPECT_EQ(rtn, 1); EXPECT_EQ(info->available, 200); - rtn=ftbf_throttle_control(bucket, now, FTBF_CMD_CONSUME_NORMAL, 160, "2", 2);//active list : 200 160 - ftbf_throttle_info(bucket, info); + rtn=ftb_throttler_control(bucket, now, FTBT_CMD_CONSUME_NORMAL, 160, "2", 2);//active list : 200 160 + ftb_throttler_info(bucket, info); EXPECT_EQ(rtn, 1); EXPECT_EQ(info->available, 200); timeradd(&now, &step, &now); - rtn=ftbf_throttle_control(bucket, now, FTBF_CMD_CONSUME_NORMAL, 20, "1", 1);//active list : 130 30 - ftbf_throttle_info(bucket, info); + rtn=ftb_throttler_control(bucket, now, FTBT_CMD_CONSUME_NORMAL, 20, "1", 1);//active list : 130 30 + ftb_throttler_info(bucket, info); EXPECT_EQ(rtn, 1); EXPECT_EQ(info->available, 100); - ftbf_throttle_free(bucket); + ftb_throttler_free(bucket); } -TEST(FTBF, Merge) +TEST(FTBT, Merge) { uuid_t uuid; - struct ftbf_throttle *bucket[2]; - struct ftbf_throttle_info *info = ALLOC(struct ftbf_throttle_info, 1); + struct ftb_throttler *bucket[2]; + struct ftb_throttler_info *info = ALLOC(struct ftb_throttler_info, 1); long long CIR=100*1000; //100*1000 bps long long CBS=200; long long rtn = 0; @@ -113,39 +113,39 @@ TEST(FTBF, Merge) step.tv_usec=(suseconds_t)1000; uuid_generate(uuid); - bucket[0]=ftbf_throttle_new(uuid, now, CIR, CBS, CIR/10); + bucket[0]=ftb_throttler_new(uuid, now, CIR, CBS, CIR/10); uuid_generate(uuid); - bucket[1]=ftbf_throttle_new(uuid, now, CIR, CBS, CIR/10); + bucket[1]=ftb_throttler_new(uuid, now, CIR, CBS, CIR/10); - rtn=ftbf_throttle_control(bucket[0], now, FTBF_CMD_CONSUME_NORMAL, 200, "1", 1); - ftbf_throttle_info(bucket[0], info); + rtn=ftb_throttler_control(bucket[0], now, FTBT_CMD_CONSUME_NORMAL, 200, "1", 1); + ftb_throttler_info(bucket[0], info); EXPECT_EQ(rtn, 1); EXPECT_EQ(info->available, 200); - rtn=ftbf_throttle_control(bucket[1], now, FTBF_CMD_CONSUME_NORMAL, 50, "2", 1); - ftbf_throttle_info(bucket[1], info); + rtn=ftb_throttler_control(bucket[1], now, FTBT_CMD_CONSUME_NORMAL, 50, "2", 1); + ftb_throttler_info(bucket[1], info); EXPECT_EQ(rtn, 1); EXPECT_EQ(info->available, 200); timeradd(&now, &step, &now); - rtn=ftbf_throttle_control(bucket[0], now, FTBF_CMD_CONSUME_NORMAL, 0, "1", 1); //300-200=100 - ftbf_throttle_info(bucket[0], info); + rtn=ftb_throttler_control(bucket[0], now, FTBT_CMD_CONSUME_NORMAL, 0, "1", 1); //300-200=100 + ftb_throttler_info(bucket[0], info); EXPECT_EQ(rtn, 1); EXPECT_EQ(info->available, 100); - rtn=ftbf_throttle_control(bucket[1], now, FTBF_CMD_CONSUME_NORMAL, 0, "2", 1);//300-50=250(200) - ftbf_throttle_info(bucket[1], info); + rtn=ftb_throttler_control(bucket[1], now, FTBT_CMD_CONSUME_NORMAL, 0, "2", 1);//300-50=250(200) + ftb_throttler_info(bucket[1], info); EXPECT_EQ(rtn, 1); EXPECT_EQ(info->available, 200); - ftbf_throttle_sync(bucket, 2); + ftbt_throttle_sync(bucket, 2); - ftbf_throttle_info(bucket[0], info); + ftb_throttler_info(bucket[0], info); EXPECT_EQ(info->available, 50); - ftbf_throttle_info(bucket[1], info); + ftb_throttler_info(bucket[1], info); EXPECT_EQ(info->available, 50); - ftbf_throttle_free(bucket[0]); - ftbf_throttle_free(bucket[1]); + ftb_throttler_free(bucket[0]); + ftb_throttler_free(bucket[1]); } #define FAIR_INDEX_CIR 1*1024*1000 //1024 bit/ms @@ -153,7 +153,7 @@ TEST(FTBF, Merge) #define FAIR_INDEX_REQ FAIR_INDEX_CIR/1000 #define FAIR_INDEX_MIMIC_DURATION_US 1*1000*1000 -TEST(FTBF, Pareto) +TEST(FTBT, Pareto) { int class_num = 24; enum flow_type ftype = PARETO; @@ -161,9 +161,9 @@ TEST(FTBF, Pareto) uuid_generate(uuid); struct timeval start; gettimeofday(&start, NULL); - struct ftbf_throttle_info *info = ALLOC(struct ftbf_throttle_info, 1); - struct ftbf_throttle *bucket=NULL; - bucket=ftbf_throttle_new(uuid, start, FAIR_INDEX_CIR, FAIR_INDEX_CBS, FAIR_INDEX_CIR/1000/10); + struct ftb_throttler_info *info = ALLOC(struct ftb_throttler_info, 1); + struct ftb_throttler *bucket=NULL; + bucket=ftb_throttler_new(uuid, start, FAIR_INDEX_CIR, FAIR_INDEX_CBS, FAIR_INDEX_CIR/1000/10); size_t i,j,k; long long tokens=0; long long consumed_class[class_num]; @@ -200,7 +200,7 @@ TEST(FTBF, Pareto) tokens = 0.2*FAIR_INDEX_REQ+rand()%5; requested += tokens; sprintf(str, "%zu", j); - int rtn = ftbf_throttle_control(bucket, now, FTBF_CMD_CONSUME_FLEXIBLE, tokens, str, class_weight[j]); + int rtn = ftb_throttler_control(bucket, now, FTBT_CMD_CONSUME_FLEXIBLE, tokens, str, class_weight[j]); ftbf_result.cnt++; ftbf_class_results[j].cnt++; if(rtn > 0) @@ -216,7 +216,7 @@ TEST(FTBF, Pareto) } upper_limit=FAIR_INDEX_CBS+FAIR_INDEX_CIR/1000*timeval_delta_ms(start, now); - ftbf_throttle_info(bucket, info); + ftb_throttler_info(bucket, info); refilled=info->refilled; for(i=0; i<(size_t)class_num; ++i) { @@ -235,7 +235,8 @@ TEST(FTBF, Pareto) consumed, jain_fair_index); ransampl_free(ws); - ftbf_throttle_free(bucket); + ftb_throttler_free(bucket); + EXPECT_NEAR(jain_fair_index, 1, 0.01); } |
