summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2023-03-15 22:21:10 +0800
committeryangwei <[email protected]>2023-03-15 22:24:52 +0800
commit2f759b2e0afa5ab298d2e5bd0df8439a3dba624a (patch)
tree351feef03d718b118fee0b943a9213bdc1e980e9
parent2382859c91b5feb95cfceda1c4200ae91c071a8d (diff)
✨ feat(fair token bucket throttler): Added ftbt and test casesFeature-add-FTBT
-rw-r--r--CRDT/CMakeLists.txt6
-rw-r--r--CRDT/crdt_gtest.cpp175
-rw-r--r--CRDT/ftb_throttler.c (renamed from CRDT/ftbf_throttle.c)90
-rw-r--r--CRDT/ftb_throttler.h46
-rw-r--r--CRDT/ftbf_throttle.h42
-rw-r--r--CRDT/ftbt_gtest.cpp (renamed from CRDT/ftbf_gtest.cpp)103
6 files changed, 186 insertions, 276 deletions
diff --git a/CRDT/CMakeLists.txt b/CRDT/CMakeLists.txt
index 52d19dc..cbfc9aa 100644
--- a/CRDT/CMakeLists.txt
+++ b/CRDT/CMakeLists.txt
@@ -1,7 +1,7 @@
add_definitions(-D_GNU_SOURCE)
add_definitions(-fPIC)
-add_library(CRDT lww_register.c pn_counter.c or_map.c or_set.c oc_token_bucket.c ftbf_throttle.c)
+add_library(CRDT lww_register.c pn_counter.c or_map.c or_set.c oc_token_bucket.c ftb_throttler.c)
include_directories(${PROJECT_SOURCE_DIR}/deps/mpack
${PROJECT_SOURCE_DIR}/deps/uthash
@@ -11,9 +11,9 @@ include_directories(${PROJECT_SOURCE_DIR}/deps/mpack
add_executable(CRDT_gtest crdt_gtest.cpp ${PROJECT_SOURCE_DIR}/deps/mpack/mpack.c)
target_link_libraries(CRDT_gtest CRDT gtest-static uuid)
-add_executable(FTBF_gtest ftbf_gtest.cpp ${PROJECT_SOURCE_DIR}/deps/mpack/mpack.c
+add_executable(FTBT_gtest ftbt_gtest.cpp ${PROJECT_SOURCE_DIR}/deps/mpack/mpack.c
${PROJECT_SOURCE_DIR}/deps/hashfun/hash_fun.c
${PROJECT_SOURCE_DIR}/deps/alias/ransampl.c
${PROJECT_SOURCE_DIR}/CRDT/util/fair_index.cpp
${PROJECT_SOURCE_DIR}/CRDT/util/flow_tokens.cpp)
-target_link_libraries(FTBF_gtest CRDT gtest-static uuid) \ No newline at end of file
+target_link_libraries(FTBT_gtest CRDT gtest-static uuid) \ No newline at end of file
diff --git a/CRDT/crdt_gtest.cpp b/CRDT/crdt_gtest.cpp
index f418585..3166d3c 100644
--- a/CRDT/crdt_gtest.cpp
+++ b/CRDT/crdt_gtest.cpp
@@ -35,87 +35,56 @@ enum traffic_type
LIGHT_UNIFORM_TYPE,
LIGHT_TWO_EIGHT_TYPE,
HEAVY_TWO_EIGHT_TYPE,
- HEAVY_UNIFORM_TYPE
+ HEAVY_UNIFORM_EXTREME_TYPE
};
-enum configure_command
-{
- ASYNC_DEFAULT,
- ASYNC_ADD_NODE,
- ASYNC_ADJUST_MERGE,
- CONCURRNET_DEFAULT,
- CONCURRENT_ADD_NODE,
- CONCURRENT_ADJUST_MERGE
-};
-
-long long get_request_tokens(size_t index, enum traffic_type type, long long step_us, long long CIR, long long CBS, size_t replica_num, size_t new_replica_bindex)
+long long get_request_tokens(int index, enum traffic_type type, long long step_us, long long CIR, long long CBS)
{
long long request_size;
- double average_ratio2, average_ratio8, total_ratio2, total_ratio8;
- long long standard = CIR*step_us/1000000;
- long long sdf = floor((long double)standard * 0.05); //floating 5%
- size_t occupy8_replica_num = floor(replica_num * 0.8);
- long long rand_sdf = random()%sdf;
- size_t scope_flag = FALSE;
- if(random()%2) scope_flag = TRUE;
- if(new_replica_bindex > 0 && index >= new_replica_bindex)
- {
- request_size = (long long)floor((long double)standard * 0.3); //request for new nodes
- if (scope_flag) {
- request_size += rand_sdf;
- } else {
- request_size -= rand_sdf;
- }
- return request_size;
- }
+ long long standard = CIR * step_us / 1000000;
+ long long sd10 = floor((long double)standard * 0.1);
+ int eight_replica_num = floor(REPLICA_NUMBER * 0.8);
+ long long rand_sd10 = random() % sd10;
+ int scope_flag = FALSE;
+ if (random() % 2) scope_flag = TRUE;
switch (type)
{
case LIGHT_UNIFORM_TYPE:
request_size = (long long)floor((long double)standard * 0.5);
if (scope_flag) {
- request_size += rand_sdf;
+ request_size += rand_sd10;
} else {
- request_size -= rand_sdf;
+ request_size -= rand_sd10;
}
break;
case LIGHT_TWO_EIGHT_TYPE:
- //average 0.67*CIR per node
- total_ratio2 = (0.67*replica_num) / 5;
- total_ratio8 = (0.67*replica_num) - total_ratio2;
- average_ratio2 = total_ratio2 / occupy8_replica_num;
- average_ratio8 = total_ratio8 / (replica_num-occupy8_replica_num);
- if (index < occupy8_replica_num && scope_flag) {
- request_size = (long long)floor((long double)standard * average_ratio2) + rand_sdf;
- } else if (index < occupy8_replica_num && !scope_flag) {
- request_size = (long long)floor((long double)standard * average_ratio2) - rand_sdf;
- } else if (index >= occupy8_replica_num && scope_flag) {
- request_size = (long long)floor((long double)standard * average_ratio8) + rand_sdf;
+ if (index < eight_replica_num && scope_flag) {
+ request_size = (long long)floor((long double)standard * 0.2) + rand_sd10;
+ } else if (index < eight_replica_num && !scope_flag) {
+ request_size = (long long)floor((long double)standard * 0.2) - rand_sd10;
+ } else if (index >= eight_replica_num && scope_flag) {
+ request_size = (long long)floor((long double)standard * 1.6) + rand_sd10;
} else {
- request_size = (long long)floor((long double)standard * average_ratio8) - rand_sdf;
+ request_size = (long long)floor((long double)standard * 1.6) - rand_sd10;
}
break;
case HEAVY_TWO_EIGHT_TYPE:
- //average 2*CIR per node
- total_ratio2 = (2.0*replica_num) / 5;
- total_ratio8 = (2.0*replica_num) - total_ratio2;
- average_ratio2 = total_ratio2 / occupy8_replica_num;
- average_ratio8 = total_ratio8 / (replica_num-occupy8_replica_num);
- if (index < occupy8_replica_num && scope_flag) {
- request_size = (long long)floor((long double)standard * average_ratio2) + rand_sdf;
- } else if (index < occupy8_replica_num && !scope_flag) {
- request_size = (long long)floor((long double)standard * average_ratio2) - rand_sdf;
- } else if (index >= occupy8_replica_num && scope_flag) {
- request_size = (long long)floor((long double)standard * average_ratio8) + rand_sdf;
+ if (index < eight_replica_num && scope_flag) {
+ request_size = (long long)floor((long double)standard * 0.6) + rand_sd10;
+ } else if (index < eight_replica_num && !scope_flag) {
+ request_size = (long long)floor((long double)standard * 0.6) - rand_sd10;
+ } else if (index >= eight_replica_num && scope_flag) {
+ request_size = (long long)floor((long double)standard * 4.8) + rand_sd10;
} else {
- request_size = (long long)floor((long double)standard * average_ratio8) - rand_sdf;
+ request_size = (long long)floor((long double)standard * 4.8) - rand_sd10;
}
break;
- case HEAVY_UNIFORM_TYPE:
+ case HEAVY_UNIFORM_EXTREME_TYPE:
request_size = (long long)floor((long double)standard * 2);
if (scope_flag) {
- request_size += rand_sdf;
+ request_size += rand_sd10;
} else {
- request_size -= rand_sdf;
+ request_size -= rand_sd10;
}
break;
default:
@@ -124,18 +93,16 @@ long long get_request_tokens(size_t index, enum traffic_type type, long long ste
return request_size;
}
-void traffic_distribution(traffic_type type, enum configure_command configure=ASYNC_DEFAULT)
+void traffic_distribution(traffic_type type)
{
- size_t init_replica_num = REPLICA_NUMBER, new_replica_bindex = 0;
- size_t replica_num = init_replica_num;
- struct OC_token_bucket *buckets[100];
+ struct OC_token_bucket *buckets[REPLICA_NUMBER];
size_t i = 0, j = 0;
long long CIR = 1*1024*1024; // 1Mps
long long CBS = 2*1024*1024;
uuid_t uuid;
struct timeval start;
gettimeofday(&start, NULL);
- for (i = 0; i < replica_num; i++)
+ for (i = 0; i < REPLICA_NUMBER; i++)
{
uuid_generate(uuid);
buckets[i] = OC_token_bucket_new(uuid, start, CIR, CBS);
@@ -148,56 +115,27 @@ void traffic_distribution(traffic_type type, enum configure_command configure=AS
memcpy(&now, &start, sizeof(now));
step.tv_sec = 0;
step.tv_usec = (suseconds_t)step_us;
- int concurrent_flag = FALSE;
- if(configure == CONCURRNET_DEFAULT || configure == CONCURRENT_ADD_NODE || configure == CONCURRENT_ADJUST_MERGE)
- {
- concurrent_flag = TRUE;
- }
for (i = 0; (long long)i < mimic_duration_us / step_us; i++)
{
+ j = i % 3; // sequence selection
timeradd(&now, &step, &now);
- for(j = 0; j < replica_num; j++)
- {
- if(!concurrent_flag)
- {
- j = i % replica_num; // sequence selection
- tokens = get_request_tokens(j, type, step_us, CIR, CBS, init_replica_num, new_replica_bindex);
- }else
- {
- tokens = get_request_tokens(j, type, step_us, CIR, CBS, init_replica_num, new_replica_bindex);
- if(new_replica_bindex <= 0 || j < new_replica_bindex)
- tokens = round((long double)tokens/init_replica_num);
- }
- flexible_tokens = OC_token_bucket_control(buckets[j], now, OCTB_CMD_CONSUME_FLEXIBLE, tokens);
- requested += tokens;
- consumed += flexible_tokens;
- if(configure == ASYNC_ADJUST_MERGE || configure == CONCURRENT_ADJUST_MERGE)
- {
- if(i % 10000 == 0)
- {
- OC_token_bucket_sync(buckets, replica_num);
- }
- }else
- {
- OC_token_bucket_sync(buckets, replica_num);
- }
- if(!concurrent_flag) break;
- }
- if((configure == ASYNC_ADD_NODE || configure == CONCURRENT_ADD_NODE) && ((i == 10000) || (i == 50000))) //two replica added
+ tokens = get_request_tokens(j, type, step_us, CIR, CBS);
+ flexible_tokens = OC_token_bucket_control(buckets[j], now, OCTB_CMD_CONSUME_FLEXIBLE, tokens);
+ requested += tokens;
+ consumed += flexible_tokens;
+ if(i%100==0)
{
- uuid_generate(uuid);
- buckets[replica_num] = OC_token_bucket_new((const char *)uuid, sizeof(uuid), now, CIR, CBS);
- new_replica_bindex = replica_num;
- replica_num++;
+ OC_token_bucket_sync(buckets, REPLICA_NUMBER);
}
}
upper_limit = CBS + CIR * timeval_delta_ms(start, now) / 1000;
refilled = OC_token_bucket_control(buckets[0], now, OCTB_CMD_READ_REFILLED, 0);
EXPECT_LE(consumed, requested);
double accuracy = (double)consumed / MIN(refilled, requested);
+ EXPECT_NEAR(accuracy, 1, 0.01);
printf("accuracy:%f, upper_limit:%lld, refilled:%lld, requested:%lld, consumed:%lld\n",
accuracy, upper_limit, refilled, requested, consumed);
- for(i = 0; i < replica_num; i++)
+ for(i = 0; i < REPLICA_NUMBER; i++)
{
OC_token_bucket_free(buckets[i]);
}
@@ -205,42 +143,10 @@ void traffic_distribution(traffic_type type, enum configure_command configure=AS
TEST(OCTokenBucket, TrafficTypeConsumer)
{
- printf("[ASYNC_DEFAULT]\n");
traffic_distribution(LIGHT_UNIFORM_TYPE);
traffic_distribution(LIGHT_TWO_EIGHT_TYPE);
traffic_distribution(HEAVY_TWO_EIGHT_TYPE);
- traffic_distribution(HEAVY_UNIFORM_TYPE);
-
- //异步模式下的追加节点,UNIFORM类型请求量降低,主要是因为总请求量一定,新加节点请求量低于平均水平
- printf("[ASYNC_ADD_NODE]\n");
- traffic_distribution(LIGHT_UNIFORM_TYPE, ASYNC_ADD_NODE);
- traffic_distribution(LIGHT_TWO_EIGHT_TYPE, ASYNC_ADD_NODE);
- traffic_distribution(HEAVY_TWO_EIGHT_TYPE, ASYNC_ADD_NODE);
- traffic_distribution(HEAVY_UNIFORM_TYPE, ASYNC_ADD_NODE);
-
- printf("[ASYNC_ADJUST_MERGE]\n");
- traffic_distribution(LIGHT_UNIFORM_TYPE, ASYNC_ADJUST_MERGE);
- traffic_distribution(LIGHT_TWO_EIGHT_TYPE, ASYNC_ADJUST_MERGE);
- traffic_distribution(HEAVY_TWO_EIGHT_TYPE, ASYNC_ADJUST_MERGE);
- traffic_distribution(HEAVY_UNIFORM_TYPE, ASYNC_ADJUST_MERGE);
-
- printf("[CONCURRENT_DEFAULT]\n");
- traffic_distribution(LIGHT_UNIFORM_TYPE, CONCURRNET_DEFAULT);
- traffic_distribution(LIGHT_TWO_EIGHT_TYPE, CONCURRNET_DEFAULT);
- traffic_distribution(HEAVY_TWO_EIGHT_TYPE, CONCURRNET_DEFAULT);
- traffic_distribution(HEAVY_UNIFORM_TYPE, CONCURRNET_DEFAULT);
-
- printf("[CONCURRENT_ADD_NODE]\n");
- traffic_distribution(LIGHT_UNIFORM_TYPE, CONCURRENT_ADD_NODE);
- traffic_distribution(LIGHT_TWO_EIGHT_TYPE, CONCURRENT_ADD_NODE);
- traffic_distribution(HEAVY_TWO_EIGHT_TYPE, CONCURRENT_ADD_NODE);
- traffic_distribution(HEAVY_UNIFORM_TYPE, CONCURRENT_ADD_NODE);
-
- printf("[CONCURRENT_ADJUST_MERGE]\n");
- traffic_distribution(LIGHT_UNIFORM_TYPE, CONCURRENT_ADJUST_MERGE);
- traffic_distribution(LIGHT_TWO_EIGHT_TYPE, CONCURRENT_ADJUST_MERGE);
- traffic_distribution(HEAVY_TWO_EIGHT_TYPE, CONCURRENT_ADJUST_MERGE);
- traffic_distribution(HEAVY_UNIFORM_TYPE, CONCURRENT_ADJUST_MERGE);
+ traffic_distribution(HEAVY_UNIFORM_EXTREME_TYPE);
}
TEST(OCTokenBucket, Basic)
@@ -460,7 +366,6 @@ TEST(OCTokenBucket, LightConsumer)
}
}
}
-
TEST(OCTokenBucket, ConcurrentHeavyConsumer)
{
struct OC_token_bucket *buckets[REPLICA_NUMBER];
diff --git a/CRDT/ftbf_throttle.c b/CRDT/ftb_throttler.c
index 25374c6..0c6d0ae 100644
--- a/CRDT/ftbf_throttle.c
+++ b/CRDT/ftb_throttler.c
@@ -1,4 +1,4 @@
-#include "ftbf_throttle.h"
+#include "ftb_throttler.h"
#include "crdt_utils.h"
#include "hash_fun.h"
#include "pn_counter.h"
@@ -12,7 +12,7 @@
#define MAX_SKETCH_WIDTH 1024
#define MAX_QUEUE_DEPTH 2000
-struct ftbf_class_id
+struct ftbt_class_id
{
long long class_hash[MAX_SKETCH_DEPTH];
unsigned char weight;
@@ -20,7 +20,7 @@ struct ftbf_class_id
struct active_class_queue
{
- struct ftbf_class_id queue[MAX_QUEUE_DEPTH];
+ struct ftbt_class_id queue[MAX_QUEUE_DEPTH];
long long head;
long long tail;
long long len;
@@ -34,16 +34,16 @@ struct count_min_sketch
size_t width;
};
-struct ftbf_configuration
+struct ftbt_configuration
{
long long CIR; //Committed Information Rate
long long CBS; //Committed Burst Size
long long refill_duration_ms;
struct timeval write_timestamp;
};
-struct ftbf_throttle
+struct ftb_throttler
{
- struct ftbf_configuration cfg;
+ struct ftbt_configuration cfg;
long long refilled;
struct timeval refill_timestamp;
struct PN_counter *consumed;
@@ -51,7 +51,7 @@ struct ftbf_throttle
struct active_class_queue active_q;
};
-static void ftbf_fill_class_id(struct ftbf_class_id *cid, struct ftbf_throttle *t, const char *req_class_id, unsigned char weight)
+static void ftbt_fill_class_id(struct ftbt_class_id *cid, struct ftb_throttler *t, const char *req_class_id, unsigned char weight)
{
for(size_t i = 0; i < t->sketch.depth; ++i)
{
@@ -60,7 +60,7 @@ static void ftbf_fill_class_id(struct ftbf_class_id *cid, struct ftbf_throttle *
cid->weight = weight;
}
-static void ftbf_update_sketch(struct count_min_sketch *sketch, long long *hash, long long count)
+static void ftbt_update_sketch(struct count_min_sketch *sketch, long long *hash, long long count)
{
size_t i = 0;
long long hashval = 0;
@@ -70,7 +70,7 @@ static void ftbf_update_sketch(struct count_min_sketch *sketch, long long *hash,
sketch->occupied_token[i][hashval] += count;
}
}
-static long long ftbf_estimate_sketch(struct count_min_sketch sketch, long long *hash)
+static long long ftbt_estimate_sketch(struct count_min_sketch sketch, long long *hash)
{
size_t i = 0;
long long minval = LLONG_MAX;
@@ -83,7 +83,7 @@ static long long ftbf_estimate_sketch(struct count_min_sketch sketch, long long
return minval;
}
-static int ftbf_aq_enqueue(struct ftbf_class_id *cid, struct ftbf_throttle *t)
+static int ftbt_aq_enqueue(struct ftbt_class_id *cid, struct ftb_throttler *t)
{
struct active_class_queue *aq = &t->active_q;
if(aq->len >= MAX_QUEUE_DEPTH)
@@ -91,14 +91,14 @@ static int ftbf_aq_enqueue(struct ftbf_class_id *cid, struct ftbf_throttle *t)
printf("Unexpected: active flow queue full.\n");
return FALSE;
}
- memcpy(&(aq->queue[aq->tail]), cid, sizeof(struct ftbf_class_id));
+ memcpy(&(aq->queue[aq->tail]), cid, sizeof(struct ftbt_class_id));
aq->tail = (aq->tail + 1) % MAX_QUEUE_DEPTH;
aq->len++;
return TRUE;
}
//update weight
-static void ftbf_operate_weight(struct ftbf_class_id cid, struct ftbf_throttle *t)
+static void ftbt_operate_weight(struct ftbt_class_id cid, struct ftb_throttler *t)
{
struct active_class_queue *aq = &t->active_q;
size_t i,j;
@@ -115,7 +115,7 @@ static void ftbf_operate_weight(struct ftbf_class_id cid, struct ftbf_throttle *
aq->queue[i].weight = cid.weight;
}
-static void ftbf_aq_dequeue(struct ftbf_throttle *t, long long available)
+static void ftbt_aq_dequeue(struct ftb_throttler *t, long long available)
{
struct active_class_queue *aq = &t->active_q;
long long occupied_tokens = 0;
@@ -123,16 +123,16 @@ static void ftbf_aq_dequeue(struct ftbf_throttle *t, long long available)
{
while (aq->len > 0)
{
- struct ftbf_class_id *cid = &aq->queue[aq->head];
- occupied_tokens = ftbf_estimate_sketch(t->sketch, cid->class_hash);
+ struct ftbt_class_id *cid = &aq->queue[aq->head];
+ occupied_tokens = ftbt_estimate_sketch(t->sketch, cid->class_hash);
unsigned char weight = cid->weight;
long long alloc_tokens_weighted = aq->alloc_tokens * weight;
long long consume = occupied_tokens > alloc_tokens_weighted ? alloc_tokens_weighted : occupied_tokens;
if(available - consume >= 0)
{
- ftbf_update_sketch(&t->sketch, cid->class_hash, -consume);
+ ftbt_update_sketch(&t->sketch, cid->class_hash, -consume);
if(occupied_tokens > alloc_tokens_weighted)
- ftbf_aq_enqueue(cid, t);
+ ftbt_aq_enqueue(cid, t);
PN_counter_incrby(t->consumed, consume);
available -= consume;
}else{
@@ -144,9 +144,9 @@ static void ftbf_aq_dequeue(struct ftbf_throttle *t, long long available)
}
}
-struct ftbf_throttle *ftbf_throttle_new(uuid_t my_id, struct timeval now, long long CIR, long long CBS, long long alloc_tokens)
+struct ftb_throttler *ftb_throttler_new(uuid_t my_id, struct timeval now, long long CIR, long long CBS, long long alloc_tokens)
{
- struct ftbf_throttle *t=ALLOC(struct ftbf_throttle, 1);
+ struct ftb_throttler *t=ALLOC(struct ftb_throttler, 1);
t->consumed=PN_counter_new(my_id);
gettimeofday(&t->cfg.write_timestamp, NULL);
memcpy(&t->cfg.write_timestamp, &now, sizeof(t->cfg.write_timestamp));
@@ -166,7 +166,7 @@ struct ftbf_throttle *ftbf_throttle_new(uuid_t my_id, struct timeval now, long l
return t;
}
-void ftbf_throttle_free(struct ftbf_throttle *t)
+void ftb_throttler_free(struct ftb_throttler *t)
{
PN_counter_free(t->consumed);
for(size_t i = 0; i < t->sketch.depth; ++i){
@@ -175,7 +175,7 @@ void ftbf_throttle_free(struct ftbf_throttle *t)
free(t);
}
-void ftbf_throttle_configure(struct ftbf_throttle *t, struct timeval now, long long CIR, long long CBS, long long refill_duration_ms)
+void ftb_throttler_configure(struct ftb_throttler *t, struct timeval now, long long CIR, long long CBS, long long refill_duration_ms)
{
memcpy(&t->cfg.write_timestamp, &now, sizeof(t->cfg.write_timestamp));
if(CIR>=0) t->cfg.CIR=CIR;
@@ -183,7 +183,7 @@ void ftbf_throttle_configure(struct ftbf_throttle *t, struct timeval now, long l
if(refill_duration_ms>0) t->cfg.refill_duration_ms=refill_duration_ms;
}
-long long ftbf_throttle_control(struct ftbf_throttle *t, struct timeval now, enum ftbf_throttle_command cmd, long long tokens, const char *req_class_id, unsigned char weight)
+long long ftb_throttler_control(struct ftb_throttler *t, struct timeval now, enum ftb_throttler_command cmd, long long tokens, const char *req_class_id, unsigned char weight)
{
long long delta_time_ms=timeval_delta_ms(t->refill_timestamp, now);
long long consumed=PN_counter_get(t->consumed);
@@ -193,7 +193,7 @@ long long ftbf_throttle_control(struct ftbf_throttle *t, struct timeval now, enu
assert(consumed>=0);
int refill_flag=0;
long long global_available, local_available;
- struct ftbf_class_id cid;
+ struct ftbt_class_id cid;
long long occupied_tokens, token_threshold;
int rtn = 0;
@@ -202,7 +202,7 @@ long long ftbf_throttle_control(struct ftbf_throttle *t, struct timeval now, enu
local_available = global_available;
if(delta_time_ms >= t->cfg.refill_duration_ms)
{
- ftbf_aq_dequeue(t, local_available);
+ ftbt_aq_dequeue(t, local_available);
}
consumed = PN_counter_get(t->consumed);
refilled = t->refilled;
@@ -233,28 +233,28 @@ long long ftbf_throttle_control(struct ftbf_throttle *t, struct timeval now, enu
global_available=MAX(refilled-consumed, 0);
local_available=global_available;
- ftbf_fill_class_id(&cid, t, req_class_id, weight);
- ftbf_operate_weight(cid, t);
- occupied_tokens = ftbf_estimate_sketch(t->sketch, cid.class_hash);
+ ftbt_fill_class_id(&cid, t, req_class_id, weight);
+ ftbt_operate_weight(cid, t);
+ occupied_tokens = ftbt_estimate_sketch(t->sketch, cid.class_hash);
token_threshold = local_available*weight;
switch(cmd)
{
- case FTBF_CMD_CONSUME_FLEXIBLE:
+ case FTBT_CMD_CONSUME_FLEXIBLE:
if (occupied_tokens <= token_threshold)
{
if (occupied_tokens == 0)
- ftbf_aq_enqueue(&cid, t);
- ftbf_update_sketch(&t->sketch, cid.class_hash, tokens);
+ ftbt_aq_enqueue(&cid, t);
+ ftbt_update_sketch(&t->sketch, cid.class_hash, tokens);
rtn++;
}
break;
- case FTBF_CMD_CONSUME_NORMAL:
+ case FTBT_CMD_CONSUME_NORMAL:
if (occupied_tokens <= token_threshold && local_available >= tokens)
{
if (occupied_tokens == 0)
- ftbf_aq_enqueue(&cid, t);
- ftbf_update_sketch(&t->sketch, cid.class_hash, tokens);
+ ftbt_aq_enqueue(&cid, t);
+ ftbt_update_sketch(&t->sketch, cid.class_hash, tokens);
rtn++;
}
break;
@@ -270,17 +270,17 @@ long long ftbf_throttle_control(struct ftbf_throttle *t, struct timeval now, enu
return rtn;
}
-long long ftbf_throttle_read_refilled(struct ftbf_throttle *t)
+long long ftbt_throttler_read_refilled(struct ftb_throttler *t)
{
return t->refilled;
}
-long long ftbf_throttle_read_available(struct ftbf_throttle *t)
+long long ftbt_throttler_read_available(struct ftb_throttler *t)
{
long long consumed=PN_counter_get(t->consumed);
long long global_available=MAX(t->refilled-consumed, 0);
return global_available;
}
-void ftbf_throttle_info(struct ftbf_throttle *t, struct ftbf_throttle_info *info)
+void ftb_throttler_info(struct ftb_throttler *t, struct ftb_throttler_info *info)
{
struct timeval now;
gettimeofday(&now, NULL);
@@ -290,11 +290,11 @@ void ftbf_throttle_info(struct ftbf_throttle *t, struct ftbf_throttle_info *info
info->refill_duration_ms=t->cfg.refill_duration_ms;
info->consumed=PN_counter_get(t->consumed);
info->refilled=t->refilled;
- info->available=ftbf_throttle_read_available(t);
+ info->available=ftbt_throttler_read_available(t);
return;
}
-void ftbf_throttle_serialize(struct ftbf_throttle *t, char **blob, size_t *blob_sz)
+void ftb_throttler_serialize(struct ftb_throttler *t, char **blob, size_t *blob_sz)
{
mpack_writer_t writer;
char *mpack_buff=NULL;
@@ -324,9 +324,9 @@ void ftbf_throttle_serialize(struct ftbf_throttle *t, char **blob, size_t *blob_
*blob_sz=mpack_sz;
return;
}
-struct ftbf_throttle *ftbf_throttle_deserialize(const char *blob, size_t blob_sz)
+struct ftb_throttler *ftb_throttler_deserialize(const char *blob, size_t blob_sz)
{
- struct ftbf_throttle *t=ALLOC(struct ftbf_throttle, 1);
+ struct ftb_throttler *t=ALLOC(struct ftb_throttler, 1);
mpack_tree_t tree;
mpack_tree_init_data(&tree, blob, blob_sz);
mpack_tree_parse(&tree);
@@ -357,7 +357,7 @@ struct ftbf_throttle *ftbf_throttle_deserialize(const char *blob, size_t blob_sz
}
return t;
}
-void ftbf_throttle_merge(struct ftbf_throttle *src, struct ftbf_throttle *dst)
+void ftb_throttler_merge(struct ftb_throttler *src, struct ftb_throttler *dst)
{
PN_counter_merge(src->consumed, dst->consumed);
if(src->refilled > dst->refilled)
@@ -371,10 +371,10 @@ void ftbf_throttle_merge(struct ftbf_throttle *src, struct ftbf_throttle *dst)
src->refill_timestamp = dst->refill_timestamp;
}
}
-void ftbf_throttle_merge_blob(struct ftbf_throttle *t, const char *blob, size_t blob_sz)
+void ftb_throttler_merge_blob(struct ftb_throttler *t, const char *blob, size_t blob_sz)
{
- struct ftbf_throttle *to_merge=ftbf_throttle_deserialize(blob, blob_sz);
- ftbf_throttle_merge(t, to_merge);
- ftbf_throttle_free(to_merge);
+ struct ftb_throttler *to_merge=ftb_throttler_deserialize(blob, blob_sz);
+ ftb_throttler_merge(t, to_merge);
+ ftb_throttler_free(to_merge);
return;
}
diff --git a/CRDT/ftb_throttler.h b/CRDT/ftb_throttler.h
new file mode 100644
index 0000000..7f20979
--- /dev/null
+++ b/CRDT/ftb_throttler.h
@@ -0,0 +1,46 @@
+/*
+* Fair Token Bucket Throttler CRDT.
+* 2023-3-15
+*/
+#pragma once
+#include <stddef.h>
+#include <sys/time.h>
+#include <uuid/uuid.h>
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+struct ftb_throttler;
+struct ftb_throttler *ftb_throttler_new(uuid_t my_id, struct timeval now, long long CIR, long long CBS, long long alloc_tokens);
+
+void ftb_throttler_configure(struct ftb_throttler *t, struct timeval now, long long CIR, long long CBS, long long refill_duration_ms);
+struct ftb_throttler_info
+{
+ long long CIR;
+ long long CBS;
+ long long refill_duration_ms;
+ long long consumed;
+ long long refilled;
+ long long available;
+ long long number_of_consumers;
+};
+void ftb_throttler_info(struct ftb_throttler *t, struct ftb_throttler_info *info);
+void ftb_throttler_free(struct ftb_throttler *t);
+
+enum ftb_throttler_command
+{
+ FTBT_CMD_CONSUME_NORMAL,
+ FTBT_CMD_CONSUME_FLEXIBLE,
+};
+long long ftb_throttler_control(struct ftb_throttler *t, struct timeval now, enum ftb_throttler_command cmd, long long tokens, const char *req_class_id, unsigned char weight);
+
+void ftb_throttler_serialize(struct ftb_throttler *t, char **blob, size_t *blob_sz);
+struct ftb_throttler *ftb_throttler_deserialize(const char *blob, size_t blob_sz);
+
+void ftb_throttler_merge(struct ftb_throttler *src, struct ftb_throttler *dst);
+void ftb_throttler_merge_blob(struct ftb_throttler *t, const char *blob, size_t blob_sz);
+
+#ifdef __cplusplus
+}
+#endif
+
diff --git a/CRDT/ftbf_throttle.h b/CRDT/ftbf_throttle.h
deleted file mode 100644
index e378397..0000000
--- a/CRDT/ftbf_throttle.h
+++ /dev/null
@@ -1,42 +0,0 @@
-#pragma once
-#include <stddef.h>
-#include <sys/time.h>
-#include <uuid/uuid.h>
-#ifdef __cplusplus
-extern "C"
-{
-#endif
-struct ftbf_throttle;
-struct ftbf_throttle *ftbf_throttle_new(uuid_t my_id, struct timeval now, long long CIR, long long CBS, long long alloc_tokens);
-
-void ftbf_throttle_configure(struct ftbf_throttle *t, struct timeval now, long long CIR, long long CBS, long long refill_duration_ms);
-struct ftbf_throttle_info
-{
- long long CIR;
- long long CBS;
- long long refill_duration_ms;
- long long consumed;
- long long refilled;
- long long available;
- long long number_of_consumers;
-};
-void ftbf_throttle_info(struct ftbf_throttle *t, struct ftbf_throttle_info *info);
-void ftbf_throttle_free(struct ftbf_throttle *t);
-
-enum ftbf_throttle_command
-{
- FTBF_CMD_CONSUME_NORMAL,
- FTBF_CMD_CONSUME_FLEXIBLE,
-};
-long long ftbf_throttle_control(struct ftbf_throttle *t, struct timeval now, enum ftbf_throttle_command cmd, long long tokens, const char *req_class_id, unsigned char weight);
-
-void ftbf_throttle_serialize(struct ftbf_throttle *t, char **blob, size_t *blob_sz);
-struct ftbf_throttle *ftbf_throttle_deserialize(const char *blob, size_t blob_sz);
-
-void ftbf_throttle_merge(struct ftbf_throttle *src, struct ftbf_throttle *dst);
-void ftbf_throttle_merge_blob(struct ftbf_throttle *t, const char *blob, size_t blob_sz);
-
-#ifdef __cplusplus
-}
-#endif
-
diff --git a/CRDT/ftbf_gtest.cpp b/CRDT/ftbt_gtest.cpp
index 1e61212..4c536bd 100644
--- a/CRDT/ftbf_gtest.cpp
+++ b/CRDT/ftbt_gtest.cpp
@@ -1,4 +1,4 @@
-#include "ftbf_throttle.h"
+#include "ftb_throttler.h"
#include "crdt_utils.h"
#include "ransampl.h"
#include <cstddef>
@@ -10,63 +10,63 @@
#include "util/flow_tokens.h"
#include "util/fair_index.h"
-void ftbf_throttle_sync(struct ftbf_throttle *list[], size_t n)
+void ftbt_throttle_sync(struct ftb_throttler *list[], size_t n)
{
char *blob=NULL;
size_t blob_sz=0;
for(size_t i=0; i<n; i++)
{
- ftbf_throttle_serialize(list[i], &blob, &blob_sz);
+ ftb_throttler_serialize(list[i], &blob, &blob_sz);
for(size_t j=0; j<n; j++)
{
if(j==i) continue;
- ftbf_throttle_merge_blob(list[j], blob, blob_sz);
+ ftb_throttler_merge_blob(list[j], blob, blob_sz);
}
free(blob);
blob=NULL;
}
}
-TEST(FTBF, Basic)
+TEST(FTBT, Basic)
{
uuid_t uuid;
uuid_generate(uuid);
- struct ftbf_throttle *bucket=NULL;
- struct ftbf_throttle_info *info = ALLOC(struct ftbf_throttle_info, 1);
+ struct ftb_throttler *bucket=NULL;
+ struct ftb_throttler_info *info = ALLOC(struct ftb_throttler_info, 1);
long long CIR=100;
long long CBS=200;
struct timeval now;
gettimeofday(&now, NULL);
- bucket=ftbf_throttle_new(uuid, now, CIR, CBS, CIR/10);
+ bucket=ftb_throttler_new(uuid, now, CIR, CBS, CIR/10);
long long rtn = 0;
- rtn=ftbf_throttle_control(bucket, now, FTBF_CMD_CONSUME_NORMAL, 60, "1", 1);
- ftbf_throttle_info(bucket, info);
+ rtn=ftb_throttler_control(bucket, now, FTBT_CMD_CONSUME_NORMAL, 60, "1", 1);
+ ftb_throttler_info(bucket, info);
EXPECT_EQ(rtn, 1);
EXPECT_EQ(info->available, 200);
- rtn=ftbf_throttle_control(bucket, now, FTBF_CMD_CONSUME_FLEXIBLE, 200, "1", 1);
- ftbf_throttle_info(bucket, info);
+ rtn=ftb_throttler_control(bucket, now, FTBT_CMD_CONSUME_FLEXIBLE, 200, "1", 1);
+ ftb_throttler_info(bucket, info);
EXPECT_EQ(rtn, 1);
EXPECT_EQ(info->available, 200);
- rtn=ftbf_throttle_control(bucket, now, FTBF_CMD_CONSUME_NORMAL, 60, "1", 1);
- ftbf_throttle_info(bucket, info);
+ rtn=ftb_throttler_control(bucket, now, FTBT_CMD_CONSUME_NORMAL, 60, "1", 1);
+ ftb_throttler_info(bucket, info);
EXPECT_EQ(rtn, 0);
EXPECT_EQ(info->available, 200);
- ftbf_throttle_free(bucket);
+ ftb_throttler_free(bucket);
}
-TEST(FTBF, BasicWeighted)
+TEST(FTBT, BasicWeighted)
{
uuid_t uuid;
uuid_generate(uuid);
- struct ftbf_throttle *bucket=NULL;
- struct ftbf_throttle_info *info = ALLOC(struct ftbf_throttle_info, 1);
+ struct ftb_throttler *bucket=NULL;
+ struct ftb_throttler_info *info = ALLOC(struct ftb_throttler_info, 1);
long long CIR=100*1000; //100*1000 bps
long long CBS=200;
struct timeval now, step;
@@ -75,34 +75,34 @@ TEST(FTBF, BasicWeighted)
step.tv_sec=0;
step.tv_usec=(suseconds_t)1000;
- bucket=ftbf_throttle_new(uuid, now, CIR, CBS, CIR/10);
+ bucket=ftb_throttler_new(uuid, now, CIR, CBS, CIR/10);
long long rtn = 0;
- rtn=ftbf_throttle_control(bucket, now, FTBF_CMD_CONSUME_NORMAL, 200, "1", 1); //active list : 200
- ftbf_throttle_info(bucket, info);
+ rtn=ftb_throttler_control(bucket, now, FTBT_CMD_CONSUME_NORMAL, 200, "1", 1); //active list : 200
+ ftb_throttler_info(bucket, info);
EXPECT_EQ(rtn, 1);
EXPECT_EQ(info->available, 200);
- rtn=ftbf_throttle_control(bucket, now, FTBF_CMD_CONSUME_NORMAL, 160, "2", 2);//active list : 200 160
- ftbf_throttle_info(bucket, info);
+ rtn=ftb_throttler_control(bucket, now, FTBT_CMD_CONSUME_NORMAL, 160, "2", 2);//active list : 200 160
+ ftb_throttler_info(bucket, info);
EXPECT_EQ(rtn, 1);
EXPECT_EQ(info->available, 200);
timeradd(&now, &step, &now);
- rtn=ftbf_throttle_control(bucket, now, FTBF_CMD_CONSUME_NORMAL, 20, "1", 1);//active list : 130 30
- ftbf_throttle_info(bucket, info);
+ rtn=ftb_throttler_control(bucket, now, FTBT_CMD_CONSUME_NORMAL, 20, "1", 1);//active list : 130 30
+ ftb_throttler_info(bucket, info);
EXPECT_EQ(rtn, 1);
EXPECT_EQ(info->available, 100);
- ftbf_throttle_free(bucket);
+ ftb_throttler_free(bucket);
}
-TEST(FTBF, Merge)
+TEST(FTBT, Merge)
{
uuid_t uuid;
- struct ftbf_throttle *bucket[2];
- struct ftbf_throttle_info *info = ALLOC(struct ftbf_throttle_info, 1);
+ struct ftb_throttler *bucket[2];
+ struct ftb_throttler_info *info = ALLOC(struct ftb_throttler_info, 1);
long long CIR=100*1000; //100*1000 bps
long long CBS=200;
long long rtn = 0;
@@ -113,39 +113,39 @@ TEST(FTBF, Merge)
step.tv_usec=(suseconds_t)1000;
uuid_generate(uuid);
- bucket[0]=ftbf_throttle_new(uuid, now, CIR, CBS, CIR/10);
+ bucket[0]=ftb_throttler_new(uuid, now, CIR, CBS, CIR/10);
uuid_generate(uuid);
- bucket[1]=ftbf_throttle_new(uuid, now, CIR, CBS, CIR/10);
+ bucket[1]=ftb_throttler_new(uuid, now, CIR, CBS, CIR/10);
- rtn=ftbf_throttle_control(bucket[0], now, FTBF_CMD_CONSUME_NORMAL, 200, "1", 1);
- ftbf_throttle_info(bucket[0], info);
+ rtn=ftb_throttler_control(bucket[0], now, FTBT_CMD_CONSUME_NORMAL, 200, "1", 1);
+ ftb_throttler_info(bucket[0], info);
EXPECT_EQ(rtn, 1);
EXPECT_EQ(info->available, 200);
- rtn=ftbf_throttle_control(bucket[1], now, FTBF_CMD_CONSUME_NORMAL, 50, "2", 1);
- ftbf_throttle_info(bucket[1], info);
+ rtn=ftb_throttler_control(bucket[1], now, FTBT_CMD_CONSUME_NORMAL, 50, "2", 1);
+ ftb_throttler_info(bucket[1], info);
EXPECT_EQ(rtn, 1);
EXPECT_EQ(info->available, 200);
timeradd(&now, &step, &now);
- rtn=ftbf_throttle_control(bucket[0], now, FTBF_CMD_CONSUME_NORMAL, 0, "1", 1); //300-200=100
- ftbf_throttle_info(bucket[0], info);
+ rtn=ftb_throttler_control(bucket[0], now, FTBT_CMD_CONSUME_NORMAL, 0, "1", 1); //300-200=100
+ ftb_throttler_info(bucket[0], info);
EXPECT_EQ(rtn, 1);
EXPECT_EQ(info->available, 100);
- rtn=ftbf_throttle_control(bucket[1], now, FTBF_CMD_CONSUME_NORMAL, 0, "2", 1);//300-50=250(200)
- ftbf_throttle_info(bucket[1], info);
+ rtn=ftb_throttler_control(bucket[1], now, FTBT_CMD_CONSUME_NORMAL, 0, "2", 1);//300-50=250(200)
+ ftb_throttler_info(bucket[1], info);
EXPECT_EQ(rtn, 1);
EXPECT_EQ(info->available, 200);
- ftbf_throttle_sync(bucket, 2);
+ ftbt_throttle_sync(bucket, 2);
- ftbf_throttle_info(bucket[0], info);
+ ftb_throttler_info(bucket[0], info);
EXPECT_EQ(info->available, 50);
- ftbf_throttle_info(bucket[1], info);
+ ftb_throttler_info(bucket[1], info);
EXPECT_EQ(info->available, 50);
- ftbf_throttle_free(bucket[0]);
- ftbf_throttle_free(bucket[1]);
+ ftb_throttler_free(bucket[0]);
+ ftb_throttler_free(bucket[1]);
}
#define FAIR_INDEX_CIR 1*1024*1000 //1024 bit/ms
@@ -153,7 +153,7 @@ TEST(FTBF, Merge)
#define FAIR_INDEX_REQ FAIR_INDEX_CIR/1000
#define FAIR_INDEX_MIMIC_DURATION_US 1*1000*1000
-TEST(FTBF, Pareto)
+TEST(FTBT, Pareto)
{
int class_num = 24;
enum flow_type ftype = PARETO;
@@ -161,9 +161,9 @@ TEST(FTBF, Pareto)
uuid_generate(uuid);
struct timeval start;
gettimeofday(&start, NULL);
- struct ftbf_throttle_info *info = ALLOC(struct ftbf_throttle_info, 1);
- struct ftbf_throttle *bucket=NULL;
- bucket=ftbf_throttle_new(uuid, start, FAIR_INDEX_CIR, FAIR_INDEX_CBS, FAIR_INDEX_CIR/1000/10);
+ struct ftb_throttler_info *info = ALLOC(struct ftb_throttler_info, 1);
+ struct ftb_throttler *bucket=NULL;
+ bucket=ftb_throttler_new(uuid, start, FAIR_INDEX_CIR, FAIR_INDEX_CBS, FAIR_INDEX_CIR/1000/10);
size_t i,j,k;
long long tokens=0;
long long consumed_class[class_num];
@@ -200,7 +200,7 @@ TEST(FTBF, Pareto)
tokens = 0.2*FAIR_INDEX_REQ+rand()%5;
requested += tokens;
sprintf(str, "%zu", j);
- int rtn = ftbf_throttle_control(bucket, now, FTBF_CMD_CONSUME_FLEXIBLE, tokens, str, class_weight[j]);
+ int rtn = ftb_throttler_control(bucket, now, FTBT_CMD_CONSUME_FLEXIBLE, tokens, str, class_weight[j]);
ftbf_result.cnt++;
ftbf_class_results[j].cnt++;
if(rtn > 0)
@@ -216,7 +216,7 @@ TEST(FTBF, Pareto)
}
upper_limit=FAIR_INDEX_CBS+FAIR_INDEX_CIR/1000*timeval_delta_ms(start, now);
- ftbf_throttle_info(bucket, info);
+ ftb_throttler_info(bucket, info);
refilled=info->refilled;
for(i=0; i<(size_t)class_num; ++i)
{
@@ -235,7 +235,8 @@ TEST(FTBF, Pareto)
consumed,
jain_fair_index);
ransampl_free(ws);
- ftbf_throttle_free(bucket);
+ ftb_throttler_free(bucket);
+ EXPECT_NEAR(jain_fair_index, 1, 0.01);
}