summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZheng Chao <[email protected]>2023-06-03 21:35:37 +0800
committer郑超 <[email protected]>2023-06-07 12:13:37 +0000
commit052cafc02059fd0a9cc2cf49ccf09dc8c4018a77 (patch)
treeed42f58224383ef1be60ecde5dd99ad08b2672e1
parent154e161a45d83957a67d8b1cbe04b3ce22b624cb (diff)
WIP
-rw-r--r--CRDT/bulk_token_bucket.c2
-rw-r--r--CRDT/crdt_base_gtest.cpp39
-rw-r--r--CRDT/crdt_tb_gtest.cpp176
-rw-r--r--CRDT/fair_token_bucket.c61
-rw-r--r--CRDT/g_array.c89
-rw-r--r--CRDT/oc_token_bucket.c39
-rw-r--r--CRDT/oc_token_bucket.h6
-rw-r--r--CRDT/token_bucket_common.c29
-rw-r--r--CRDT/token_bucket_common.h5
-rw-r--r--docs/design.md1
10 files changed, 324 insertions, 123 deletions
diff --git a/CRDT/bulk_token_bucket.c b/CRDT/bulk_token_bucket.c
index 9b1a1e3..679a3d1 100644
--- a/CRDT/bulk_token_bucket.c
+++ b/CRDT/bulk_token_bucket.c
@@ -140,7 +140,7 @@ long long bulk_token_bucket_consume(struct bulk_token_bucket *btb, struct timeva
}
else
{
- assigned=tb_consume(btb->cfg.CIR, global_available, n_replica, cmd, tokens);
+ assigned=tb_consume_reserve_based(btb->cfg.CIR, global_available, n_replica, cmd, tokens);
}
if(new_refilled!=refilled)
diff --git a/CRDT/crdt_base_gtest.cpp b/CRDT/crdt_base_gtest.cpp
index 2dfba64..483d501 100644
--- a/CRDT/crdt_base_gtest.cpp
+++ b/CRDT/crdt_base_gtest.cpp
@@ -1773,6 +1773,45 @@ TEST(GArray, Basic)
g_array_free(a[i]);
}
}
+TEST(GArray, Reset)
+{
+ uuid_t uuid;
+ size_t n_replica=2;
+ int array_sz=1024, round=100;
+ struct g_array *a[n_replica];
+ for(size_t i=0; i<n_replica; i++)
+ {
+ uuid_generate(uuid);
+ a[i]=g_array_new(uuid, array_sz);
+ }
+ long long idx=1, value=0;
+ srandom(17);
+ for(size_t i=0; i<round; i++)
+ {
+ g_array_incrby(a[0], idx, 100);
+ g_array_incrby(a[1], idx, 100);
+ g_array_sync(a, n_replica);
+ value=g_array_get(a[0], idx);
+ EXPECT_EQ(value, 200);
+ g_array_reset(a[0]);
+ value=g_array_get(a[0], idx);
+ EXPECT_EQ(value, 0);
+ g_array_incrby(a[1], idx, 1);
+ g_array_sync(a, n_replica);
+ //Observed Reset
+ value=g_array_get(a[0], idx);
+ EXPECT_EQ(value, 1);
+ value=g_array_get(a[1], idx);
+ EXPECT_EQ(value, 1);
+ idx++;
+ }
+
+
+ for(size_t i=0; i<n_replica; i++)
+ {
+ g_array_free(a[i]);
+ }
+}
TEST(GArray, Serialize)
{
uuid_t uuid;
diff --git a/CRDT/crdt_tb_gtest.cpp b/CRDT/crdt_tb_gtest.cpp
index f91ac23..8b3b3ec 100644
--- a/CRDT/crdt_tb_gtest.cpp
+++ b/CRDT/crdt_tb_gtest.cpp
@@ -36,10 +36,10 @@ enum traffic_type
HEAVY_TWO_EIGHT_TYPE,
HEAVY_UNIFORM_EXTREME_TYPE
};
-const int REPLICA_NUMBER=3;
+
long long get_request_tokens(int index, enum traffic_type type, long long step_us, long long CIR, long long CBS)
{
-
+ const int REPLICA_NUMBER=3;
long long request_size=0;
long long standard = CIR * step_us / 1000000;
long long sd10 = floor((long double)standard * 0.1);
@@ -95,6 +95,7 @@ long long get_request_tokens(int index, enum traffic_type type, long long step_u
void traffic_distribution(traffic_type type)
{
+ const int REPLICA_NUMBER=3;
struct OC_token_bucket *buckets[REPLICA_NUMBER];
size_t i = 0, j = 0;
long long CIR = 1*1024*1024;
@@ -108,7 +109,7 @@ void traffic_distribution(traffic_type type)
buckets[i] = OC_token_bucket_new(uuid, start, CIR, CBS);
}
long long tokens = 0, flexible_tokens = 0;
- long long consumed = 0, requested = 0, upper_limit = 0, refilled = 0;
+ long long consumed = 0, requested = 0, upper_limit = 0;
long long mimic_duration_us = (long long)100*1000*1000;
long long step_us = 100;
struct timeval step, now;
@@ -131,10 +132,9 @@ void traffic_distribution(traffic_type type)
upper_limit = CBS + CIR * timeval_delta_ms(start, now) / 1000;
struct OC_token_bucket_info info;
OC_token_bucket_info(buckets[0], now, &info);
- refilled=info.refilled;
EXPECT_LE(consumed, requested);
- double accuracy = (double)consumed / MIN(refilled, requested);
- EXPECT_NEAR(accuracy, 1, 0.01);
+ double accuracy = (double)consumed / MIN(upper_limit, requested);
+ EXPECT_NEAR(accuracy, 1, 0.02);
printf("accuracy:%f, upper_limit:%lld, refilled:%lld, requested:%lld, consumed:%lld\n",
accuracy, upper_limit, info.refilled, requested, consumed);
for(i = 0; i < REPLICA_NUMBER; i++)
@@ -300,7 +300,8 @@ TEST(OCTokenBucket, Merge)
OC_token_bucket_sync(buckets, 2);
now.tv_sec++;
- //each has 200-130-30+100/2=90 avaliable tokens
+ //available=200-130-30+100=140
+ //reserved=CIR/2=50, so local avaliable is 90
tokens=OC_token_bucket_consume(buckets[0], now, TB_CONSUME_NORMAL, 91);
EXPECT_EQ(tokens, 0);
tokens=OC_token_bucket_consume(buckets[1], now, TB_CONSUME_FLEXIBLE, 90);
@@ -369,10 +370,10 @@ double OC_token_bucket_test(size_t replica_num, long long mimic_duration_s, long
TEST(OCTokenBucket, HeavyConsumer)
{
double accuracy=0.0;
- long long replica_num=0, test_duration_s=500, sync_interval_ms=0;
+ long long replica_num=0, test_duration_s=200, sync_interval_ms=100;
long long i=0, j=0;
- for(i=8; i<9; i++)
+ for(i=6; i<9; i++)
{
replica_num=i+1;
for(j=0; j<10; j++)
@@ -395,7 +396,7 @@ TEST(OCTokenBucket, LightConsumer)
replica_num=i+1;
for(j=0; j<20; j++)
{
- sync_interval_ms=100*(j+1);
+ sync_interval_ms=10*(j+1);
accuracy=OC_token_bucket_test(replica_num, test_duration_s, sync_interval_ms, 90);
EXPECT_NEAR(accuracy, 1, 0.03);
}
@@ -403,6 +404,7 @@ TEST(OCTokenBucket, LightConsumer)
}
TEST(OCTokenBucket, ConcurrentHeavyConsumer)
{
+ const int REPLICA_NUMBER=3;
struct OC_token_bucket *buckets[REPLICA_NUMBER];
size_t i=0, j=0;
long long CIR=100*1024*1024;
@@ -434,7 +436,7 @@ TEST(OCTokenBucket, ConcurrentHeavyConsumer)
requested+=tokens;
consumed+=OC_token_bucket_consume(buckets[j], now, TB_CONSUME_FLEXIBLE, tokens);
}
- if(i%100==0)
+ if(i%80==0)
{
OC_token_bucket_sync(buckets, REPLICA_NUMBER);
}
@@ -454,8 +456,63 @@ TEST(OCTokenBucket, ConcurrentHeavyConsumer)
OC_token_bucket_free(buckets[i]);
}
}
+TEST(OCTokenBucket, MixTypeConsumer)
+{
+ const int n_replica=2;
+ struct OC_token_bucket *buckets[n_replica];
+ size_t i=0, j=0;
+ long long CIR=20*1024*1024;
+ long long CBS=40*1024*1024;
+ uuid_t uuid;
+ struct timeval start;
+ gettimeofday(&start, NULL);
+ for(i=0; i<n_replica; i++)
+ {
+ uuid_generate(uuid);
+ buckets[i]=OC_token_bucket_new(uuid, start, CIR, CBS);
+ }
+ srandom(17);
+ long long tokens=0;
+ long long consumed=0, requested=0, upper_limit=0, refilled=0;
+ long long mimic_duration_us=(long long)100*1000*1000;
+ long long step_us=100;
+ struct timeval step, now;
+ memcpy(&now, &start, sizeof(now));
+ step.tv_sec=0;
+ step.tv_usec=(suseconds_t)step_us;
+ for(i=0; (long long)i<mimic_duration_us/step_us; i++)
+ {
+ timeradd(&now, &step, &now);
+ for(j=0; j<n_replica; j++)
+ {
+ tokens=CIR*step_us/(1000*1000);
+ tokens=MAX(3*j*tokens/n_replica, 1);
+ requested+=tokens;
+ consumed+=OC_token_bucket_consume(buckets[j%n_replica], now, TB_CONSUME_FLEXIBLE, tokens);
+ }
+ if(i%80==0)
+ {
+ OC_token_bucket_sync(buckets, n_replica);
+ }
+ }
+ upper_limit=CBS+CIR*timeval_delta_ms(start, now)/1000;
+ struct OC_token_bucket_info info;
+ OC_token_bucket_info(buckets[0], now, &info);
+ refilled=info.refilled;
+
+ EXPECT_NEAR((double)consumed/MIN(refilled, requested), 1, 0.01);
+ EXPECT_NEAR((double)refilled/upper_limit, 1, 0.01);
+ EXPECT_LE(consumed, requested);
+ double accuracy=(double)consumed/MIN(upper_limit, requested);
+ EXPECT_NEAR(accuracy, 1, 0.01);
+ for(i=0; i<n_replica; i++)
+ {
+ OC_token_bucket_free(buckets[i]);
+ }
+}
TEST(OCTokenBucket, Reconfigure)
{
+ const int REPLICA_NUMBER=3;
struct OC_token_bucket *buckets[REPLICA_NUMBER];
size_t i=0, j=0;
double accuracy=0.0;
@@ -542,6 +599,7 @@ TEST(OCTokenBucket, Reconfigure)
}
TEST(OCTokenBucket, PartitionTolerance)
{
+ const int REPLICA_NUMBER=3;
struct OC_token_bucket *buckets[REPLICA_NUMBER];
size_t i=0, j=0;
long long CIR=1*1024*1024;
@@ -689,10 +747,11 @@ void ftb_sync(struct fair_token_bucket **ftb, size_t n_ftb)
}
return;
}
+double expected_fairness_index=0.03;
double test_fair_token_bucket(struct sftb_class *classes, size_t n_class, long long CIR, long long CBS, int duration_s, int n_replica)
{
uuid_t uuid;
- uuid_generate(uuid);
+
struct timeval start, step, now;
int step_us=1000;
gettimeofday(&start, NULL);
@@ -700,11 +759,13 @@ double test_fair_token_bucket(struct sftb_class *classes, size_t n_class, long l
step.tv_sec=0;
step.tv_usec=(suseconds_t)step_us;
struct fair_token_bucket *ftb[n_replica];
- for(int i=0; i<n_replica; i++)
+ for(int i=0; i<n_replica+1; i++)
{
+ uuid_generate(uuid);
ftb[i]=fair_token_bucket_new(uuid, now, CIR, CBS, 8192);
}
- int sync_interval_ms=400;
+ int sync_interval_ms=40;
+ long long got=0, got1=0;
for(int i=0; i<duration_s*(1000*1000/step_us); i++)
{
timeradd(&now, &step, &now);
@@ -714,13 +775,25 @@ double test_fair_token_bucket(struct sftb_class *classes, size_t n_class, long l
{
int idx=(j+k)%n_class;
long long this_demand=classes[idx].requested_CIR*step_us/(1000*1000);
- classes[idx].allocated_tokens+=fair_token_bucket_consume(ftb[r], now,
+ got=fair_token_bucket_consume(ftb[r], now,
(const char*) &(classes[idx].class_id),
sizeof(classes[idx].class_id),
classes[idx].weight,
TB_CONSUME_NORMAL,
this_demand);
+ got1=fair_token_bucket_consume(ftb[n_replica], now,
+ (const char*) &(classes[idx].class_id),
+ sizeof(classes[idx].class_id),
+ classes[idx].weight,
+ TB_CONSUME_NORMAL,
+ this_demand);
+ if(got>got1)
+ {
+ //printf("%lld %lld\n", got, got1);
+ }
+ classes[idx].allocated_tokens+=got;
classes[idx].demand_tokens+=this_demand;
+
}
if(0==i%(sync_interval_ms*1000/step_us))
{
@@ -729,7 +802,7 @@ double test_fair_token_bucket(struct sftb_class *classes, size_t n_class, long l
}
long long available_tokens=CIR*duration_s+CBS;
double index=max_min_fairness_index(available_tokens, classes, n_class);
- int print=0;
+ int print=1;
if(print)
{
printf("class\tweight\tdemand\tallocated\tideal\r\n");
@@ -756,15 +829,15 @@ double test_fair_token_bucket(struct sftb_class *classes, size_t n_class, long l
TEST(FairTokenBucket, Basic)
{
double index=0;
- long long duration_s=350;
+ long long duration_s=20;
struct sftb_class one_heavy_classes[5]={{1, 1, 20000, 0, 0, 0},
{2, 1, 20000, 0, 0, 0},
{3, 1, 20000, 0, 0, 0},
{4, 1, 20000, 0, 0, 0},
{5, 1, 50000, 0, 0, 0}};
- index=test_fair_token_bucket(one_heavy_classes, 1, 120000, 200000, duration_s, 1);
- EXPECT_NEAR(index, 1, 0.02);
+ index=test_fair_token_bucket(one_heavy_classes, 1, 120000, 200000, duration_s, 2);
+ EXPECT_NEAR(index, 1, expected_fairness_index);
struct sftb_class all_light_classes[5]={{1, 1, 20000, 0, 0, 0},
{2, 1, 20000, 0, 0, 0},
@@ -773,7 +846,7 @@ TEST(FairTokenBucket, Basic)
{5, 1, 20000, 0, 0, 0}};
index=test_fair_token_bucket(all_light_classes, 5, 100000, 200000, duration_s, 1);
- EXPECT_NEAR(index, 1, 0.02);
+ EXPECT_NEAR(index, 1, expected_fairness_index);
struct sftb_class two_heavy_classes[5]={{1, 1, 20000, 0, 0, 0},
{2, 1, 20000, 0, 0, 0},
@@ -781,8 +854,8 @@ TEST(FairTokenBucket, Basic)
{400, 1, 50000, 0, 0, 0},
{5, 1, 20003, 0, 0, 0}};
- index=test_fair_token_bucket(two_heavy_classes, 5, 100000, 200000, duration_s, 1);
- EXPECT_NEAR(index, 1, 0.02);
+ index=test_fair_token_bucket(two_heavy_classes, 5, 100000, 200000, duration_s, 2);
+ EXPECT_NEAR(index, 1, expected_fairness_index);
struct sftb_class all_heavy_classes[5]={{1, 1, 40000, 0, 0, 0},
{2, 1, 40000, 0, 0, 0},
@@ -791,21 +864,35 @@ TEST(FairTokenBucket, Basic)
{5, 1, 40000, 0, 0, 0}};
index=test_fair_token_bucket(all_heavy_classes, 5, 100000, 200000, duration_s, 1);
- EXPECT_NEAR(index, 1, 0.02);
+ EXPECT_NEAR(index, 1, expected_fairness_index);
}
+TEST(FairTokenBucket, Debug)
+{
+ double index=0;
+ long long duration_s=20;
+ struct sftb_class two_heavy_classes[5]={{1, 1, 20000, 0, 0, 0},
+ {2, 1, 20000, 0, 0, 0},
+ {3, 1, 50000, 0, 0, 0},
+ {400, 1, 50000, 0, 0, 0},
+ {5, 1, 20003, 0, 0, 0}};
+
+ index=test_fair_token_bucket(two_heavy_classes, 5, 100000, 200000, duration_s, 2);
+ EXPECT_NEAR(index, 1, expected_fairness_index);
+}
TEST(FairTokenBucket, Replicas)
{
double index=0;
long long duration_s=350;
+ int n_replica=2;
struct sftb_class one_heavy_classes[5]={{1, 1, 20000, 0, 0, 0},
{2, 1, 20000, 0, 0, 0},
{3, 1, 20000, 0, 0, 0},
{4, 1, 20000, 0, 0, 0},
{5, 1, 50000, 0, 0, 0}};
- index=test_fair_token_bucket(one_heavy_classes, 1, 120000, 200000, duration_s, 2);
- EXPECT_NEAR(index, 1, 0.02);
+ index=test_fair_token_bucket(one_heavy_classes, 1, 120000, 200000, duration_s, n_replica);
+ EXPECT_NEAR(index, 1, expected_fairness_index);
struct sftb_class two_heavy_classes[5]={{1, 1, 20000, 0, 0, 0},
{2, 1, 20000, 0, 0, 0},
@@ -813,22 +900,23 @@ TEST(FairTokenBucket, Replicas)
{400, 1, 50000, 0, 0, 0},
{5, 1, 20003, 0, 0, 0}};
- index=test_fair_token_bucket(two_heavy_classes, 5, 100000, 200000, duration_s, 2);
- EXPECT_NEAR(index, 1, 0.02);
+ index=test_fair_token_bucket(two_heavy_classes, 5, 100000, 200000, duration_s, n_replica);
+ EXPECT_NEAR(index, 1, expected_fairness_index);
}
TEST(FairTokenBucket, Weight)
{
double index=0;
long long duration_s=500;
+ int n_replica=3;
struct sftb_class one_heavy_classes[5]={{1, 1, 20000, 0, 0, 0},
{2, 2, 20000, 0, 0, 0},
{3, 3, 20000, 0, 0, 0},
{4, 4, 30000, 0, 0, 0},
{5, 5, 50000, 0, 0, 0}};
- index=test_fair_token_bucket(one_heavy_classes, 5, 120000, 200000, duration_s, 1);
- EXPECT_NEAR(index, 1, 0.02);
+ double one_heavy_index=test_fair_token_bucket(one_heavy_classes, 5, 120000, 200000, duration_s, n_replica);
+ EXPECT_NEAR(one_heavy_index, 1, expected_fairness_index);
struct sftb_class t1_heavy_classes[5]={{1, 1, 40000, 0, 0, 0},
{2, 2, 20000, 0, 0, 0},
@@ -836,8 +924,8 @@ TEST(FairTokenBucket, Weight)
{4, 4, 30000, 0, 0, 0},
{5, 5, 50000, 0, 0, 0}};
- index=test_fair_token_bucket(t1_heavy_classes, 5, 120000, 200000, duration_s, 1);
- EXPECT_NEAR(index, 1, 0.02);
+ double t1_heavy_index=test_fair_token_bucket(t1_heavy_classes, 5, 120000, 200000, duration_s, n_replica);
+ EXPECT_NEAR(t1_heavy_index, 1, expected_fairness_index);
struct sftb_class t2_heavy_classes[5]={{1, 1, 40000, 0, 0, 0},
{2, 2, 20000, 0, 0, 0},
@@ -845,8 +933,8 @@ TEST(FairTokenBucket, Weight)
{4, 4, 30000, 0, 0, 0},
{5, 10, 50000, 0, 0, 0}};
- index=test_fair_token_bucket(t2_heavy_classes, 5, 120000, 200000, duration_s, 1);
- EXPECT_NEAR(index, 1, 0.02);
+ double t2_heavy_index=test_fair_token_bucket(t2_heavy_classes, 5, 120000, 200000, duration_s, n_replica);
+ EXPECT_NEAR(t2_heavy_index, 1, 0.02);
struct sftb_class t3_heavy_classes[5]={{1, 1, 40000, 0, 0, 0},
{2, 2, 20000, 0, 0, 0},
@@ -854,8 +942,8 @@ TEST(FairTokenBucket, Weight)
{4, 4, 20000, 0, 0, 0},
{5, 10, 10000, 0, 0, 0}};
- index=test_fair_token_bucket(t3_heavy_classes, 5, 100000, 200000, duration_s, 1);
- EXPECT_NEAR(index, 1, 0.02);
+ double t3_heavy_index=test_fair_token_bucket(t3_heavy_classes, 5, 100000, 200000, duration_s, n_replica);
+ EXPECT_NEAR(t3_heavy_index, 1, expected_fairness_index);
struct sftb_class all_light_classes[5]={{1, 1, 20000, 0, 0, 0},
{2, 2, 20000, 0, 0, 0},
@@ -863,8 +951,8 @@ TEST(FairTokenBucket, Weight)
{4, 4, 20000, 0, 0, 0},
{5, 10, 23000, 0, 0, 0}};
- index=test_fair_token_bucket(all_light_classes, 5, 100000, 200000, duration_s, 1);
- EXPECT_NEAR(index, 1, 0.02);
+ index=test_fair_token_bucket(all_light_classes, 5, 100000, 200000, duration_s, n_replica);
+ EXPECT_NEAR(index, 1, expected_fairness_index);
struct sftb_class light_with_big_player[5]={{1, 1, 10000, 0, 0, 0},
{2, 2, 10000, 0, 0, 0},
@@ -872,10 +960,10 @@ TEST(FairTokenBucket, Weight)
{4, 4, 10000, 0, 0, 0},
{5, 10, 60000, 0, 0, 0}};
- index=test_fair_token_bucket(light_with_big_player, 5, 100000, 200000, duration_s, 1);
- EXPECT_NEAR(index, 1, 0.02);
+ double all_light_index=test_fair_token_bucket(light_with_big_player, 5, 100000, 200000, duration_s, n_replica);
+ EXPECT_NEAR(all_light_index, 1, expected_fairness_index);
- struct sftb_class sandvine_active_logic[10]={{1, 5, 30000, 0, 0, 0},
+ struct sftb_class sd_al[10]={{1, 5, 30000, 0, 0, 0},
{2, 5, 30000, 0, 0, 0},
{3, 3, 12000, 0, 0, 0},
{4, 3, 12000, 0, 0, 0},
@@ -886,8 +974,8 @@ TEST(FairTokenBucket, Weight)
{9, 2, 9000, 0, 0, 0},
{10, 2, 20000, 0, 0, 0}};
- index=test_fair_token_bucket(sandvine_active_logic, 10, 100000, 200000, duration_s, 1);
- EXPECT_NEAR(index, 1, 0.02);
+ double sd_al_index=test_fair_token_bucket(sd_al, 10, 100000, 200000, duration_s, n_replica);
+ EXPECT_NEAR(sd_al_index, 1, expected_fairness_index);
}
TEST(FairTokenBucket, Weight5000)
{
@@ -904,7 +992,7 @@ TEST(FairTokenBucket, Weight5000)
very_heavy_classes[i].weight=i%20+1;
}
double index=test_fair_token_bucket(very_heavy_classes, n_class, CIR, CBS, 40, 1);
- EXPECT_NEAR(index, 1, 0.02);
+ EXPECT_NEAR(index, 1, expected_fairness_index);
struct sftb_class slight_heavy_classes[n_class];
memset(slight_heavy_classes, 0, sizeof(slight_heavy_classes));
@@ -915,7 +1003,7 @@ TEST(FairTokenBucket, Weight5000)
slight_heavy_classes[i].weight=i%20+1;
}
index=test_fair_token_bucket(slight_heavy_classes, n_class, CIR, CBS, 40, 1);
- EXPECT_NEAR(index, 1, 0.02);
+ EXPECT_NEAR(index, 1, expected_fairness_index);
}
struct btb_key
diff --git a/CRDT/fair_token_bucket.c b/CRDT/fair_token_bucket.c
index 66e3976..d38dcec 100644
--- a/CRDT/fair_token_bucket.c
+++ b/CRDT/fair_token_bucket.c
@@ -11,7 +11,7 @@
#include <stdint.h>
-#define REFILL_INTERVAL_MS 200
+#define PERTUB_INTERVAL_MS 100
struct fair_token_bucket
@@ -25,9 +25,13 @@ struct fair_token_bucket
struct OC_token_bucket *bucket;
/* Local Variables*/
- struct timeval last_refill_time;
- long long per_weight_quantum; //updated every REFILL_INTERVAL_MS
- long long n_active_key;
+ struct timeval last_pertub;
+ long long per_weight_quantum; //updated every PERTUB_INTERVAL_MS
+ long long total_weight;
+ long long n_active_key;
+ long long debug_last_defict;
+ long long debug_last_allocated;
+ long long debug_consume_call;
};
struct fair_token_bucket *fair_token_bucket_new(uuid_t uuid, struct timeval now, long long CIR, long long CBS, long long divisor)
{
@@ -41,6 +45,8 @@ struct fair_token_bucket *fair_token_bucket_new(uuid_t uuid, struct timeval now,
}
ftb->sfq=g_array_new(uuid, ftb->divisor);
ftb->bucket=OC_token_bucket_new(uuid, now, CIR, CBS);
+ OC_token_bucket_set_no_reserve(ftb->bucket);
+ ftb->per_weight_quantum=CIR;
return ftb;
}
void fair_token_bucket_configure(struct fair_token_bucket *ftb, struct timeval now, long long CIR, long long CBS, long long divisor)
@@ -64,13 +70,8 @@ void fair_token_bucket_free(struct fair_token_bucket *ftb)
free(ftb);
}
-static void ftb_refill(struct fair_token_bucket *ftb, struct timeval now)
+static void ftb_quantum_estimation(struct fair_token_bucket *ftb, struct timeval now)
{
- long long delta_time_ms=timeval_delta_ms(ftb->last_refill_time, now);
- if(likely(delta_time_ms<REFILL_INTERVAL_MS))
- {
- return;
- }
//Per weight quantum estimation
long long total_weight=0, n_active_key=0, count=0;
for(int i=0; i<FAIR_TB_WEIGHT_MAX; i++)
@@ -81,21 +82,32 @@ static void ftb_refill(struct fair_token_bucket *ftb, struct timeval now)
}
struct OC_token_bucket_info info;
OC_token_bucket_info(ftb->bucket, now, &info);
- long long available=info.available;
+ long long available=info.available;
+ //available=MAX(available-info.CIR*(2-1)/2, 0);
+
+ ftb->total_weight=total_weight;
ftb->per_weight_quantum=available/MAX(1, total_weight);
ftb->n_active_key=n_active_key;
-
- memcpy(&ftb->last_refill_time, &now, sizeof(ftb->last_refill_time));
+}
+static void ftb_perturb(struct fair_token_bucket *ftb, struct timeval now)
+{
+ ftb_quantum_estimation(ftb, now);
g_array_reset(ftb->sfq);
ftb->perturb_seed++;
}
-
long long fair_token_bucket_consume(struct fair_token_bucket *ftb, struct timeval now, const char *key, size_t keylen, long long weight, enum tb_consume_type cmd, long long tokens)
{
+ ftb->debug_consume_call++;
if(weight>FAIR_TB_WEIGHT_MAX || weight<0)
return -1;
ST_hyperloglog_add(ftb->hll[weight-1], key, keylen, now);
- ftb_refill(ftb, now);
+
+ long long delta_time_ms=timeval_delta_ms(ftb->last_pertub, now);
+ if(unlikely(delta_time_ms>=PERTUB_INTERVAL_MS))
+ {
+ ftb_perturb(ftb, now);
+ memcpy(&ftb->last_pertub, &now, sizeof(ftb->last_pertub));
+ }
int sfq_idx=0;
sfq_idx=XXH3_64bits_withSeed(key, keylen, ftb->perturb_seed)%ftb->divisor;
@@ -103,12 +115,18 @@ long long fair_token_bucket_consume(struct fair_token_bucket *ftb, struct timeva
long long deficit_est=0;
deficit_est=g_array_get(ftb->sfq, sfq_idx);
deficit_est/=MAX(1, ftb->n_active_key/ftb->divisor);
+ ftb->debug_last_defict=deficit_est;
if(tokens + deficit_est > ftb->per_weight_quantum*weight)
{
+ ftb->debug_last_allocated=-1;
return 0;
}
long long allocated_tokens=OC_token_bucket_consume(ftb->bucket, now, cmd, tokens);
- if(allocated_tokens) g_array_incrby(ftb->sfq, sfq_idx, allocated_tokens);
+ if(allocated_tokens)
+ {
+ g_array_incrby(ftb->sfq, sfq_idx, allocated_tokens);
+ }
+ ftb->debug_last_allocated=allocated_tokens;
return allocated_tokens;
}
@@ -125,6 +143,7 @@ struct ftb_header
long long payload_sz;
long long divisor;
long long pertub_seed;
+ struct timeval last_pertub;
};
size_t fair_token_bucket_serialized_size(const struct fair_token_bucket *ftb)
{
@@ -147,7 +166,7 @@ void fair_token_bucket_serialize(const struct fair_token_bucket *ftb, char **blo
hdr.payload_sz=sz;
hdr.divisor=ftb->divisor;
hdr.pertub_seed=ftb->perturb_seed;
-
+ memcpy(&hdr.last_pertub, &ftb->last_pertub, sizeof(hdr.last_pertub));
char *buffer=ALLOC(char, sz);
memcpy(buffer+offset, &hdr, sizeof(hdr));
offset += sizeof(hdr);
@@ -185,6 +204,7 @@ struct fair_token_bucket *fair_token_bucket_deserialize(const char *blob, size_t
struct fair_token_bucket *ftb=ALLOC(struct fair_token_bucket, 1);
ftb->perturb_seed=hdr.pertub_seed;
ftb->divisor=hdr.divisor;
+ memcpy(&ftb->last_pertub, &hdr.last_pertub, sizeof(ftb->last_pertub));
for(int i=0; i<FAIR_TB_WEIGHT_MAX; i++)
{
ftb->hll[i]=ST_hyperloglog_deserialize(blob+offset, blob_sz-offset);
@@ -199,12 +219,17 @@ struct fair_token_bucket *fair_token_bucket_deserialize(const char *blob, size_t
}
void fair_token_bucket_merge(struct fair_token_bucket *dst, const struct fair_token_bucket *src)
{
- dst->perturb_seed=MAX(dst->perturb_seed, src->perturb_seed);
dst->divisor=MAX(dst->divisor, src->divisor);
for(int i=0; i<FAIR_TB_WEIGHT_MAX; i++)
{
ST_hyperloglog_merge(dst->hll[i], src->hll[i]);
}
+ if(timercmp(&dst->last_pertub, &src->last_pertub, <))//Last-Write-Wins
+ {
+ dst->perturb_seed=src->perturb_seed;
+ memcpy(&dst->last_pertub, &src->last_pertub, sizeof(dst->last_pertub));
+ ftb_quantum_estimation(dst, dst->last_pertub);
+ }
g_array_merge(dst->sfq, src->sfq);
OC_token_bucket_merge(dst->bucket, src->bucket);
return;
diff --git a/CRDT/g_array.c b/CRDT/g_array.c
index 2272bfc..f6cd7b3 100644
--- a/CRDT/g_array.c
+++ b/CRDT/g_array.c
@@ -7,14 +7,13 @@
#define TO_LOCAL_EPOCH(x) (x & ((1ULL << 24) - 1))
struct counter_item
{
- long long epoch:24;
- long long count:40;
+ long long count;
};
struct counter_array
{
uuid_t replica_id;
- long long array_sz;
long long sequence;
+ long long array_sz;
struct counter_item *array;
UT_hash_handle hh;
};
@@ -44,7 +43,7 @@ struct g_array *g_array_new(uuid_t my_id, long long array_sz)
struct g_array *ga=ALLOC(struct g_array, 1);
uuid_copy(ga->my_id, my_id);
ga->array_sz=array_sz;
- return ga;
+ return ga;
}
void g_array_free(struct g_array *ga)
{
@@ -57,15 +56,24 @@ void g_array_free(struct g_array *ga)
free(ga);
return;
}
-
+static void update_global_epoch(struct g_array *ga)
+{
+ struct counter_array *a=NULL, *tmp=NULL;
+ ga->epoch=0;
+ HASH_ITER(hh, ga->hash, a, tmp)
+ {
+ // ga->epoch+=a->epoch;
+ }
+ return;
+}
long long g_array_get(const struct g_array *ga, long long idx)
{
struct counter_array *a=NULL, *tmp=NULL;
long long value=0;
HASH_ITER(hh, ga->hash, a, tmp)
{
- if(idx < a->array_sz && a->array[idx].epoch == TO_LOCAL_EPOCH(ga->epoch))
- value += a->array[idx].count;
+ if(idx < a->array_sz)
+ value += a->array[idx].count;
}
return value;
}
@@ -80,18 +88,25 @@ long long g_array_incrby(struct g_array *ga, long long idx, long long increment)
HASH_ADD_KEYPTR(hh, ga->hash, a->replica_id, sizeof(a->replica_id), a);
}
assert(idx < a->array_sz);
- if(a->array[idx].epoch != TO_LOCAL_EPOCH(ga->epoch))
- {
- a->array[idx].epoch = TO_LOCAL_EPOCH(ga->epoch);
- a->array[idx].count = 0;
- }
a->array[idx].count += increment;
- a->sequence ++;
+ a->sequence++;
return g_array_get(ga, idx);
}
void g_array_reset(struct g_array *ga)
{
- ga->epoch++;
+ struct counter_array *a=NULL;
+
+ HASH_FIND(hh, ga->hash, ga->my_id, sizeof(ga->my_id), a);
+ if(!a)
+ {
+ a=counter_array_new(ga->my_id, ga->array_sz);
+ HASH_ADD_KEYPTR(hh, ga->hash, a->replica_id, sizeof(a->replica_id), a);
+ }
+ for(size_t i=0; i<ga->array_sz; i++)
+ {
+ a->array[i].count -= g_array_get(ga, i);
+ }
+ a->sequence++;
}
void g_array_resize(struct g_array *ga, long long new_size)
{
@@ -99,13 +114,12 @@ void g_array_resize(struct g_array *ga, long long new_size)
HASH_FIND(hh, ga->hash, ga->my_id, sizeof(ga->my_id), a);
if(a && a->array_sz < new_size)
{
- a->array=(struct counter_item *)realloc(a->array, sizeof(struct counter_item)*new_size);
- a->array_sz=new_size;
- a->sequence++;
+ a->array=(struct counter_item *)realloc(a->array, sizeof(struct counter_item)*new_size);
+ a->array_sz=new_size;
}
ga->array_sz=new_size;
- return;
+ return;
}
size_t g_array_replicas(const struct g_array *ga)
{
@@ -120,9 +134,9 @@ size_t g_array_serialized_size(const struct g_array *ga)
HASH_ITER(hh, ga->hash, item, tmp)
{
sz += G_ARRAY_ITEM_HEADER_SIZE;
- sz += item->array_sz*sizeof(struct counter_item);
+ sz += item->array_sz*sizeof(struct counter_item);
}
- return sz;
+ return sz;
}
void g_array_serialize(const struct g_array *ga, char **blob, size_t *blob_sz)
{
@@ -172,43 +186,44 @@ struct g_array * g_array_deserialize(const char *blob, size_t blob_sz)
assert(offset<=blob_sz);
return ga;
}
+
void g_array_merge(struct g_array *dst, const struct g_array *src)
{
- struct counter_array *src_item=NULL, *dst_item=NULL, *tmp=NULL;
+ struct counter_array *src_item=NULL, *dst_item=NULL, *tmp=NULL;
long long max_array_sz=0;
- HASH_ITER(hh, src->hash, src_item, tmp)
- {
- HASH_FIND(hh, dst->hash, src_item->replica_id, sizeof(src_item->replica_id), dst_item);
- if(!dst_item)
- {
- dst_item=ALLOC(struct counter_array, 1);
+ HASH_ITER(hh, src->hash, src_item, tmp)
+ {
+ HASH_FIND(hh, dst->hash, src_item->replica_id, sizeof(src_item->replica_id), dst_item);
+ if(!dst_item)
+ {
+ dst_item=ALLOC(struct counter_array, 1);
memcpy(dst_item, src_item, G_ARRAY_ITEM_HEADER_SIZE);
- dst_item->array=ALLOC(struct counter_item, dst_item->array_sz);
- memcpy(dst_item->array, src_item->array, dst_item->array_sz*sizeof(struct counter_item));
+ dst_item->array=ALLOC(struct counter_item, dst_item->array_sz);
+ memcpy(dst_item->array, src_item->array, dst_item->array_sz*sizeof(struct counter_item));
HASH_ADD_KEYPTR(hh, dst->hash, dst_item->replica_id, sizeof(dst_item->replica_id), dst_item);
- }
+ }
else
{
if(src_item->sequence > dst_item->sequence)
{
- dst_item->array_sz=src_item->array_sz;
+ memcpy(dst_item, src_item, G_ARRAY_ITEM_HEADER_SIZE);
dst_item->array=realloc(dst_item->array, sizeof(struct counter_item)*dst_item->array_sz);
memcpy(dst_item->array, src_item->array, sizeof(struct counter_item)*dst_item->array_sz);
}
}
max_array_sz=MAX(src_item->array_sz, max_array_sz);
- }
- dst->epoch=MAX(dst->epoch, src->epoch);
+ }
+ update_global_epoch(dst);
if(dst->array_sz<max_array_sz)
{
g_array_resize(dst, max_array_sz);
}
- return;
+ return;
}
void g_array_merge_blob(struct g_array *ga, const char *blob, size_t blob_sz)
{
struct g_array *src=g_array_deserialize(blob, blob_sz);
- g_array_merge(ga, src);
- g_array_free(src);
- return;
+ g_array_merge(ga, src);
+ g_array_free(src);
+ return;
} \ No newline at end of file
diff --git a/CRDT/oc_token_bucket.c b/CRDT/oc_token_bucket.c
index 502596c..1facf43 100644
--- a/CRDT/oc_token_bucket.c
+++ b/CRDT/oc_token_bucket.c
@@ -6,12 +6,12 @@
#include <assert.h>
#include <string.h>
-
+#define OCTB_REFILL_INTERVAL_MS_DEFAULT 10
struct OC_configuration
{
long long CIR; //Committed Information Rate
long long CBS; //Committed Burst Size
- long long refill_duration_ms;
+ long long refill_interval_ms;
struct timeval write_timestamp;
};
struct OC_token_bucket
@@ -20,6 +20,9 @@ struct OC_token_bucket
long long refilled;
struct timeval refill_timestamp;
struct PN_counter *consumed;
+
+ /* Local variables */
+ int no_reserve;
};
const size_t OCTB_BLOB_HDR_SIZE= offsetof(struct OC_token_bucket, consumed);
struct OC_token_bucket *OC_token_bucket_new(uuid_t my_id, struct timeval now, long long CIR, long long CBS)
@@ -30,7 +33,7 @@ struct OC_token_bucket *OC_token_bucket_new(uuid_t my_id, struct timeval now, lo
memcpy(&bucket->cfg.write_timestamp, &now, sizeof(bucket->cfg.write_timestamp));
bucket->cfg.CIR=CIR;
bucket->cfg.CBS=CBS;
- bucket->cfg.refill_duration_ms=10;
+ bucket->cfg.refill_interval_ms=OCTB_REFILL_INTERVAL_MS_DEFAULT;
memcpy(&bucket->refill_timestamp, &now, sizeof(bucket->cfg.write_timestamp));
bucket->refilled=bucket->cfg.CBS;
return bucket;
@@ -48,7 +51,10 @@ void OC_token_bucket_configure(struct OC_token_bucket *bucket, struct timeval no
if(CIR>=0) bucket->cfg.CIR=CIR;
if(CBS>=0) bucket->cfg.CBS=CBS;
}
-
+void OC_token_bucket_set_no_reserve(struct OC_token_bucket *bucket)
+{
+ bucket->no_reserve=1;
+}
long long OC_token_bucket_consume(struct OC_token_bucket *bucket, struct timeval now, enum tb_consume_type cmd, long long tokens)
{
@@ -60,7 +66,7 @@ long long OC_token_bucket_consume(struct OC_token_bucket *bucket, struct timeval
long long new_refilled=0;
long long available=tb_available(bucket->cfg.CIR, bucket->cfg.CBS, consumed,
- refilled, delta_time_ms, bucket->cfg.refill_duration_ms, &new_refilled);
+ refilled, delta_time_ms, bucket->cfg.refill_interval_ms, &new_refilled);
int infinite_flag=0;
if(bucket->cfg.CBS==0 && bucket->cfg.CIR==0)
@@ -69,7 +75,11 @@ long long OC_token_bucket_consume(struct OC_token_bucket *bucket, struct timeval
new_refilled += tokens;
}
size_t n_replica=PN_counter_replica_num(bucket->consumed);
-
+ if(refilled!=new_refilled)
+ {
+ bucket->refilled=new_refilled;
+ memcpy(&bucket->refill_timestamp, &now, sizeof(bucket->refill_timestamp));
+ }
long long allocated=0;
if(infinite_flag)
{
@@ -77,19 +87,15 @@ long long OC_token_bucket_consume(struct OC_token_bucket *bucket, struct timeval
}
else
{
- allocated=tb_consume(bucket->cfg.CIR, available, n_replica, cmd, tokens);
+ //allocated=tb_consume_quantum_based(available, bucket->deficit, bucket->quantum, cmd, tokens);
+ if(bucket->no_reserve) n_replica=1;
+ allocated=tb_consume_reserve_based(bucket->cfg.CIR, available, n_replica, cmd, tokens);
}
if(allocated>0)
{
PN_counter_incrby(bucket->consumed, allocated);
}
- if(refilled!=new_refilled)
- {
- bucket->refilled=new_refilled;
- memcpy(&bucket->refill_timestamp, &now, sizeof(bucket->refill_timestamp));
- }
- assert(allocated>=0);
-
+ assert(allocated>=0);
return allocated;
}
@@ -99,10 +105,11 @@ void OC_token_bucket_info(struct OC_token_bucket *bucket, struct timeval now, st
info->CIR=bucket->cfg.CIR;
info->CBS=bucket->cfg.CBS;
- info->refill_duration_ms=bucket->cfg.refill_duration_ms;
+ info->refill_interval_ms=bucket->cfg.refill_interval_ms;
info->consumed=PN_counter_get(bucket->consumed);
info->available=tb_available(bucket->cfg.CIR, bucket->cfg.CBS, info->consumed,
- bucket->refilled, delta_time_ms, bucket->cfg.refill_duration_ms, &info->refilled);
+ bucket->refilled, delta_time_ms, bucket->cfg.refill_interval_ms, &info->refilled);
+ info->number_of_replica=PN_counter_replica_num(bucket->consumed);
return;
}
size_t OC_token_bucket_serialized_size(const struct OC_token_bucket *bucket)
diff --git a/CRDT/oc_token_bucket.h b/CRDT/oc_token_bucket.h
index 172a850..26a0a32 100644
--- a/CRDT/oc_token_bucket.h
+++ b/CRDT/oc_token_bucket.h
@@ -23,15 +23,15 @@ struct OC_token_bucket_info
{
long long CIR;
long long CBS;
- long long refill_duration_ms;
+ long long refill_interval_ms;
long long consumed;
long long refilled;
long long available;
- long long number_of_consumers;
+ long long number_of_replica;
};
void OC_token_bucket_info(struct OC_token_bucket *bucket, struct timeval now, struct OC_token_bucket_info *info);
void OC_token_bucket_free(struct OC_token_bucket *bucket);
-
+void OC_token_bucket_set_no_reserve(struct OC_token_bucket *bucket);
long long OC_token_bucket_consume(struct OC_token_bucket *bucket, struct timeval now, enum tb_consume_type cmd, long long tokens);
diff --git a/CRDT/token_bucket_common.c b/CRDT/token_bucket_common.c
index 895b392..52d25c5 100644
--- a/CRDT/token_bucket_common.c
+++ b/CRDT/token_bucket_common.c
@@ -30,7 +30,7 @@ long long tb_available(long long CIR, long long CBS, long long consumed, long lo
}
return MAX(*new_refilled-consumed, 0);
}
-long long tb_consume(long long CIR, long long available, size_t n_replica, enum tb_consume_type cmd, long long tokens)
+long long tb_consume_reserve_based(long long CIR, long long available, size_t n_replica, enum tb_consume_type cmd, long long tokens)
{
long long reserved=CIR*(n_replica-1)/n_replica;
long long local_available=MAX(available-reserved, 0);
@@ -44,7 +44,7 @@ long long tb_consume(long long CIR, long long available, size_t n_replica, enum
allocated=tokens;
break;
case TB_CONSUME_FLEXIBLE:
- allocated=MIN(tokens, local_available);;
+ allocated=MIN(tokens, local_available);
break;
case TB_CONSUME_NORMAL:
allocated=(tokens<=local_available) ? tokens:0;
@@ -54,4 +54,29 @@ long long tb_consume(long long CIR, long long available, size_t n_replica, enum
break;
}
return allocated;
+}
+long long tb_consume_quantum_based(long long available, long long deficit, long long quantum, enum tb_consume_type cmd, long long tokens)
+{
+ long long local_available=MIN(available, quantum-deficit);
+
+ long long allocated=0;
+ switch(cmd)
+ {
+ case TB_CONSUME_AS_MUCH_AS_POSSIBLE:
+ allocated=available;
+ break;
+ case TB_CONSUME_FORCE:
+ allocated=tokens;
+ break;
+ case TB_CONSUME_FLEXIBLE:
+ allocated=MIN(tokens, local_available);
+ break;
+ case TB_CONSUME_NORMAL:
+ allocated=(tokens<=local_available) ? tokens:0;
+ break;
+ default:
+ assert(0);
+ break;
+ }
+ return allocated;
} \ No newline at end of file
diff --git a/CRDT/token_bucket_common.h b/CRDT/token_bucket_common.h
index a3f737f..7ad7044 100644
--- a/CRDT/token_bucket_common.h
+++ b/CRDT/token_bucket_common.h
@@ -12,10 +12,11 @@ enum tb_consume_type
TB_CONSUME_NORMAL,
TB_CONSUME_FORCE,
TB_CONSUME_FLEXIBLE,
- TB_CONSUME_AS_MUCH_AS_POSSIBLE,
+ TB_CONSUME_AS_MUCH_AS_POSSIBLE
};
long long tb_available(long long CIR, long long CBS, long long consumed, long long refilled, long long delta_ms, long long refill_interval_ms, long long *new_refilled);
-long long tb_consume(long long CIR, long long available, size_t n_replica, enum tb_consume_type cmd, long long tokens);
+long long tb_consume_quantum_based(long long available, long long deficit, long long quantum, enum tb_consume_type cmd, long long tokens);
+long long tb_consume_reserve_based(long long CIR, long long available, size_t n_replica, enum tb_consume_type cmd, long long tokens);
#ifdef __cplusplus
}
#endif \ No newline at end of file
diff --git a/docs/design.md b/docs/design.md
index c3a3f0f..e191425 100644
--- a/docs/design.md
+++ b/docs/design.md
@@ -240,6 +240,7 @@ For a swarmkv node, it has three modules. Each module has its own internal data
- Keyspace Module stores replica addresses of keys. It's a key-route hash table which has 16384 slots. Each slot is protected by a mutex lock. Lock contention happens when wrting on keys of same slot.
- Monitor Module collects runtime metrics for diagnostic. It's lock-free by taking advantage of [HdrHistogram_c](https://github.com/HdrHistogram/HdrHistogram_c)'s interval recorder.
+[Todo] Lock free API powered by completion queue.
## Source code layout
The source files are organized as follows: