diff options
| author | Zheng Chao <[email protected]> | 2023-06-03 21:35:37 +0800 |
|---|---|---|
| committer | 郑超 <[email protected]> | 2023-06-07 12:13:37 +0000 |
| commit | 052cafc02059fd0a9cc2cf49ccf09dc8c4018a77 (patch) | |
| tree | ed42f58224383ef1be60ecde5dd99ad08b2672e1 | |
| parent | 154e161a45d83957a67d8b1cbe04b3ce22b624cb (diff) | |
WIP
| -rw-r--r-- | CRDT/bulk_token_bucket.c | 2 | ||||
| -rw-r--r-- | CRDT/crdt_base_gtest.cpp | 39 | ||||
| -rw-r--r-- | CRDT/crdt_tb_gtest.cpp | 176 | ||||
| -rw-r--r-- | CRDT/fair_token_bucket.c | 61 | ||||
| -rw-r--r-- | CRDT/g_array.c | 89 | ||||
| -rw-r--r-- | CRDT/oc_token_bucket.c | 39 | ||||
| -rw-r--r-- | CRDT/oc_token_bucket.h | 6 | ||||
| -rw-r--r-- | CRDT/token_bucket_common.c | 29 | ||||
| -rw-r--r-- | CRDT/token_bucket_common.h | 5 | ||||
| -rw-r--r-- | docs/design.md | 1 |
10 files changed, 324 insertions, 123 deletions
diff --git a/CRDT/bulk_token_bucket.c b/CRDT/bulk_token_bucket.c index 9b1a1e3..679a3d1 100644 --- a/CRDT/bulk_token_bucket.c +++ b/CRDT/bulk_token_bucket.c @@ -140,7 +140,7 @@ long long bulk_token_bucket_consume(struct bulk_token_bucket *btb, struct timeva } else { - assigned=tb_consume(btb->cfg.CIR, global_available, n_replica, cmd, tokens); + assigned=tb_consume_reserve_based(btb->cfg.CIR, global_available, n_replica, cmd, tokens); } if(new_refilled!=refilled) diff --git a/CRDT/crdt_base_gtest.cpp b/CRDT/crdt_base_gtest.cpp index 2dfba64..483d501 100644 --- a/CRDT/crdt_base_gtest.cpp +++ b/CRDT/crdt_base_gtest.cpp @@ -1773,6 +1773,45 @@ TEST(GArray, Basic) g_array_free(a[i]); } } +TEST(GArray, Reset) +{ + uuid_t uuid; + size_t n_replica=2; + int array_sz=1024, round=100; + struct g_array *a[n_replica]; + for(size_t i=0; i<n_replica; i++) + { + uuid_generate(uuid); + a[i]=g_array_new(uuid, array_sz); + } + long long idx=1, value=0; + srandom(17); + for(size_t i=0; i<round; i++) + { + g_array_incrby(a[0], idx, 100); + g_array_incrby(a[1], idx, 100); + g_array_sync(a, n_replica); + value=g_array_get(a[0], idx); + EXPECT_EQ(value, 200); + g_array_reset(a[0]); + value=g_array_get(a[0], idx); + EXPECT_EQ(value, 0); + g_array_incrby(a[1], idx, 1); + g_array_sync(a, n_replica); + //Observed Reset + value=g_array_get(a[0], idx); + EXPECT_EQ(value, 1); + value=g_array_get(a[1], idx); + EXPECT_EQ(value, 1); + idx++; + } + + + for(size_t i=0; i<n_replica; i++) + { + g_array_free(a[i]); + } +} TEST(GArray, Serialize) { uuid_t uuid; diff --git a/CRDT/crdt_tb_gtest.cpp b/CRDT/crdt_tb_gtest.cpp index f91ac23..8b3b3ec 100644 --- a/CRDT/crdt_tb_gtest.cpp +++ b/CRDT/crdt_tb_gtest.cpp @@ -36,10 +36,10 @@ enum traffic_type HEAVY_TWO_EIGHT_TYPE, HEAVY_UNIFORM_EXTREME_TYPE }; -const int REPLICA_NUMBER=3; + long long get_request_tokens(int index, enum traffic_type type, long long step_us, long long CIR, long long CBS) { - + const int REPLICA_NUMBER=3; long long request_size=0; long long standard = CIR * step_us / 1000000; long long sd10 = floor((long double)standard * 0.1); @@ -95,6 +95,7 @@ long long get_request_tokens(int index, enum traffic_type type, long long step_u void traffic_distribution(traffic_type type) { + const int REPLICA_NUMBER=3; struct OC_token_bucket *buckets[REPLICA_NUMBER]; size_t i = 0, j = 0; long long CIR = 1*1024*1024; @@ -108,7 +109,7 @@ void traffic_distribution(traffic_type type) buckets[i] = OC_token_bucket_new(uuid, start, CIR, CBS); } long long tokens = 0, flexible_tokens = 0; - long long consumed = 0, requested = 0, upper_limit = 0, refilled = 0; + long long consumed = 0, requested = 0, upper_limit = 0; long long mimic_duration_us = (long long)100*1000*1000; long long step_us = 100; struct timeval step, now; @@ -131,10 +132,9 @@ void traffic_distribution(traffic_type type) upper_limit = CBS + CIR * timeval_delta_ms(start, now) / 1000; struct OC_token_bucket_info info; OC_token_bucket_info(buckets[0], now, &info); - refilled=info.refilled; EXPECT_LE(consumed, requested); - double accuracy = (double)consumed / MIN(refilled, requested); - EXPECT_NEAR(accuracy, 1, 0.01); + double accuracy = (double)consumed / MIN(upper_limit, requested); + EXPECT_NEAR(accuracy, 1, 0.02); printf("accuracy:%f, upper_limit:%lld, refilled:%lld, requested:%lld, consumed:%lld\n", accuracy, upper_limit, info.refilled, requested, consumed); for(i = 0; i < REPLICA_NUMBER; i++) @@ -300,7 +300,8 @@ TEST(OCTokenBucket, Merge) OC_token_bucket_sync(buckets, 2); now.tv_sec++; - //each has 200-130-30+100/2=90 avaliable tokens + //available=200-130-30+100=140 + //reserved=CIR/2=50, so local avaliable is 90 tokens=OC_token_bucket_consume(buckets[0], now, TB_CONSUME_NORMAL, 91); EXPECT_EQ(tokens, 0); tokens=OC_token_bucket_consume(buckets[1], now, TB_CONSUME_FLEXIBLE, 90); @@ -369,10 +370,10 @@ double OC_token_bucket_test(size_t replica_num, long long mimic_duration_s, long TEST(OCTokenBucket, HeavyConsumer) { double accuracy=0.0; - long long replica_num=0, test_duration_s=500, sync_interval_ms=0; + long long replica_num=0, test_duration_s=200, sync_interval_ms=100; long long i=0, j=0; - for(i=8; i<9; i++) + for(i=6; i<9; i++) { replica_num=i+1; for(j=0; j<10; j++) @@ -395,7 +396,7 @@ TEST(OCTokenBucket, LightConsumer) replica_num=i+1; for(j=0; j<20; j++) { - sync_interval_ms=100*(j+1); + sync_interval_ms=10*(j+1); accuracy=OC_token_bucket_test(replica_num, test_duration_s, sync_interval_ms, 90); EXPECT_NEAR(accuracy, 1, 0.03); } @@ -403,6 +404,7 @@ TEST(OCTokenBucket, LightConsumer) } TEST(OCTokenBucket, ConcurrentHeavyConsumer) { + const int REPLICA_NUMBER=3; struct OC_token_bucket *buckets[REPLICA_NUMBER]; size_t i=0, j=0; long long CIR=100*1024*1024; @@ -434,7 +436,7 @@ TEST(OCTokenBucket, ConcurrentHeavyConsumer) requested+=tokens; consumed+=OC_token_bucket_consume(buckets[j], now, TB_CONSUME_FLEXIBLE, tokens); } - if(i%100==0) + if(i%80==0) { OC_token_bucket_sync(buckets, REPLICA_NUMBER); } @@ -454,8 +456,63 @@ TEST(OCTokenBucket, ConcurrentHeavyConsumer) OC_token_bucket_free(buckets[i]); } } +TEST(OCTokenBucket, MixTypeConsumer) +{ + const int n_replica=2; + struct OC_token_bucket *buckets[n_replica]; + size_t i=0, j=0; + long long CIR=20*1024*1024; + long long CBS=40*1024*1024; + uuid_t uuid; + struct timeval start; + gettimeofday(&start, NULL); + for(i=0; i<n_replica; i++) + { + uuid_generate(uuid); + buckets[i]=OC_token_bucket_new(uuid, start, CIR, CBS); + } + srandom(17); + long long tokens=0; + long long consumed=0, requested=0, upper_limit=0, refilled=0; + long long mimic_duration_us=(long long)100*1000*1000; + long long step_us=100; + struct timeval step, now; + memcpy(&now, &start, sizeof(now)); + step.tv_sec=0; + step.tv_usec=(suseconds_t)step_us; + for(i=0; (long long)i<mimic_duration_us/step_us; i++) + { + timeradd(&now, &step, &now); + for(j=0; j<n_replica; j++) + { + tokens=CIR*step_us/(1000*1000); + tokens=MAX(3*j*tokens/n_replica, 1); + requested+=tokens; + consumed+=OC_token_bucket_consume(buckets[j%n_replica], now, TB_CONSUME_FLEXIBLE, tokens); + } + if(i%80==0) + { + OC_token_bucket_sync(buckets, n_replica); + } + } + upper_limit=CBS+CIR*timeval_delta_ms(start, now)/1000; + struct OC_token_bucket_info info; + OC_token_bucket_info(buckets[0], now, &info); + refilled=info.refilled; + + EXPECT_NEAR((double)consumed/MIN(refilled, requested), 1, 0.01); + EXPECT_NEAR((double)refilled/upper_limit, 1, 0.01); + EXPECT_LE(consumed, requested); + double accuracy=(double)consumed/MIN(upper_limit, requested); + EXPECT_NEAR(accuracy, 1, 0.01); + for(i=0; i<n_replica; i++) + { + OC_token_bucket_free(buckets[i]); + } +} TEST(OCTokenBucket, Reconfigure) { + const int REPLICA_NUMBER=3; struct OC_token_bucket *buckets[REPLICA_NUMBER]; size_t i=0, j=0; double accuracy=0.0; @@ -542,6 +599,7 @@ TEST(OCTokenBucket, Reconfigure) } TEST(OCTokenBucket, PartitionTolerance) { + const int REPLICA_NUMBER=3; struct OC_token_bucket *buckets[REPLICA_NUMBER]; size_t i=0, j=0; long long CIR=1*1024*1024; @@ -689,10 +747,11 @@ void ftb_sync(struct fair_token_bucket **ftb, size_t n_ftb) } return; } +double expected_fairness_index=0.03; double test_fair_token_bucket(struct sftb_class *classes, size_t n_class, long long CIR, long long CBS, int duration_s, int n_replica) { uuid_t uuid; - uuid_generate(uuid); + struct timeval start, step, now; int step_us=1000; gettimeofday(&start, NULL); @@ -700,11 +759,13 @@ double test_fair_token_bucket(struct sftb_class *classes, size_t n_class, long l step.tv_sec=0; step.tv_usec=(suseconds_t)step_us; struct fair_token_bucket *ftb[n_replica]; - for(int i=0; i<n_replica; i++) + for(int i=0; i<n_replica+1; i++) { + uuid_generate(uuid); ftb[i]=fair_token_bucket_new(uuid, now, CIR, CBS, 8192); } - int sync_interval_ms=400; + int sync_interval_ms=40; + long long got=0, got1=0; for(int i=0; i<duration_s*(1000*1000/step_us); i++) { timeradd(&now, &step, &now); @@ -714,13 +775,25 @@ double test_fair_token_bucket(struct sftb_class *classes, size_t n_class, long l { int idx=(j+k)%n_class; long long this_demand=classes[idx].requested_CIR*step_us/(1000*1000); - classes[idx].allocated_tokens+=fair_token_bucket_consume(ftb[r], now, + got=fair_token_bucket_consume(ftb[r], now, (const char*) &(classes[idx].class_id), sizeof(classes[idx].class_id), classes[idx].weight, TB_CONSUME_NORMAL, this_demand); + got1=fair_token_bucket_consume(ftb[n_replica], now, + (const char*) &(classes[idx].class_id), + sizeof(classes[idx].class_id), + classes[idx].weight, + TB_CONSUME_NORMAL, + this_demand); + if(got>got1) + { + //printf("%lld %lld\n", got, got1); + } + classes[idx].allocated_tokens+=got; classes[idx].demand_tokens+=this_demand; + } if(0==i%(sync_interval_ms*1000/step_us)) { @@ -729,7 +802,7 @@ double test_fair_token_bucket(struct sftb_class *classes, size_t n_class, long l } long long available_tokens=CIR*duration_s+CBS; double index=max_min_fairness_index(available_tokens, classes, n_class); - int print=0; + int print=1; if(print) { printf("class\tweight\tdemand\tallocated\tideal\r\n"); @@ -756,15 +829,15 @@ double test_fair_token_bucket(struct sftb_class *classes, size_t n_class, long l TEST(FairTokenBucket, Basic) { double index=0; - long long duration_s=350; + long long duration_s=20; struct sftb_class one_heavy_classes[5]={{1, 1, 20000, 0, 0, 0}, {2, 1, 20000, 0, 0, 0}, {3, 1, 20000, 0, 0, 0}, {4, 1, 20000, 0, 0, 0}, {5, 1, 50000, 0, 0, 0}}; - index=test_fair_token_bucket(one_heavy_classes, 1, 120000, 200000, duration_s, 1); - EXPECT_NEAR(index, 1, 0.02); + index=test_fair_token_bucket(one_heavy_classes, 1, 120000, 200000, duration_s, 2); + EXPECT_NEAR(index, 1, expected_fairness_index); struct sftb_class all_light_classes[5]={{1, 1, 20000, 0, 0, 0}, {2, 1, 20000, 0, 0, 0}, @@ -773,7 +846,7 @@ TEST(FairTokenBucket, Basic) {5, 1, 20000, 0, 0, 0}}; index=test_fair_token_bucket(all_light_classes, 5, 100000, 200000, duration_s, 1); - EXPECT_NEAR(index, 1, 0.02); + EXPECT_NEAR(index, 1, expected_fairness_index); struct sftb_class two_heavy_classes[5]={{1, 1, 20000, 0, 0, 0}, {2, 1, 20000, 0, 0, 0}, @@ -781,8 +854,8 @@ TEST(FairTokenBucket, Basic) {400, 1, 50000, 0, 0, 0}, {5, 1, 20003, 0, 0, 0}}; - index=test_fair_token_bucket(two_heavy_classes, 5, 100000, 200000, duration_s, 1); - EXPECT_NEAR(index, 1, 0.02); + index=test_fair_token_bucket(two_heavy_classes, 5, 100000, 200000, duration_s, 2); + EXPECT_NEAR(index, 1, expected_fairness_index); struct sftb_class all_heavy_classes[5]={{1, 1, 40000, 0, 0, 0}, {2, 1, 40000, 0, 0, 0}, @@ -791,21 +864,35 @@ TEST(FairTokenBucket, Basic) {5, 1, 40000, 0, 0, 0}}; index=test_fair_token_bucket(all_heavy_classes, 5, 100000, 200000, duration_s, 1); - EXPECT_NEAR(index, 1, 0.02); + EXPECT_NEAR(index, 1, expected_fairness_index); } +TEST(FairTokenBucket, Debug) +{ + double index=0; + long long duration_s=20; + struct sftb_class two_heavy_classes[5]={{1, 1, 20000, 0, 0, 0}, + {2, 1, 20000, 0, 0, 0}, + {3, 1, 50000, 0, 0, 0}, + {400, 1, 50000, 0, 0, 0}, + {5, 1, 20003, 0, 0, 0}}; + + index=test_fair_token_bucket(two_heavy_classes, 5, 100000, 200000, duration_s, 2); + EXPECT_NEAR(index, 1, expected_fairness_index); +} TEST(FairTokenBucket, Replicas) { double index=0; long long duration_s=350; + int n_replica=2; struct sftb_class one_heavy_classes[5]={{1, 1, 20000, 0, 0, 0}, {2, 1, 20000, 0, 0, 0}, {3, 1, 20000, 0, 0, 0}, {4, 1, 20000, 0, 0, 0}, {5, 1, 50000, 0, 0, 0}}; - index=test_fair_token_bucket(one_heavy_classes, 1, 120000, 200000, duration_s, 2); - EXPECT_NEAR(index, 1, 0.02); + index=test_fair_token_bucket(one_heavy_classes, 1, 120000, 200000, duration_s, n_replica); + EXPECT_NEAR(index, 1, expected_fairness_index); struct sftb_class two_heavy_classes[5]={{1, 1, 20000, 0, 0, 0}, {2, 1, 20000, 0, 0, 0}, @@ -813,22 +900,23 @@ TEST(FairTokenBucket, Replicas) {400, 1, 50000, 0, 0, 0}, {5, 1, 20003, 0, 0, 0}}; - index=test_fair_token_bucket(two_heavy_classes, 5, 100000, 200000, duration_s, 2); - EXPECT_NEAR(index, 1, 0.02); + index=test_fair_token_bucket(two_heavy_classes, 5, 100000, 200000, duration_s, n_replica); + EXPECT_NEAR(index, 1, expected_fairness_index); } TEST(FairTokenBucket, Weight) { double index=0; long long duration_s=500; + int n_replica=3; struct sftb_class one_heavy_classes[5]={{1, 1, 20000, 0, 0, 0}, {2, 2, 20000, 0, 0, 0}, {3, 3, 20000, 0, 0, 0}, {4, 4, 30000, 0, 0, 0}, {5, 5, 50000, 0, 0, 0}}; - index=test_fair_token_bucket(one_heavy_classes, 5, 120000, 200000, duration_s, 1); - EXPECT_NEAR(index, 1, 0.02); + double one_heavy_index=test_fair_token_bucket(one_heavy_classes, 5, 120000, 200000, duration_s, n_replica); + EXPECT_NEAR(one_heavy_index, 1, expected_fairness_index); struct sftb_class t1_heavy_classes[5]={{1, 1, 40000, 0, 0, 0}, {2, 2, 20000, 0, 0, 0}, @@ -836,8 +924,8 @@ TEST(FairTokenBucket, Weight) {4, 4, 30000, 0, 0, 0}, {5, 5, 50000, 0, 0, 0}}; - index=test_fair_token_bucket(t1_heavy_classes, 5, 120000, 200000, duration_s, 1); - EXPECT_NEAR(index, 1, 0.02); + double t1_heavy_index=test_fair_token_bucket(t1_heavy_classes, 5, 120000, 200000, duration_s, n_replica); + EXPECT_NEAR(t1_heavy_index, 1, expected_fairness_index); struct sftb_class t2_heavy_classes[5]={{1, 1, 40000, 0, 0, 0}, {2, 2, 20000, 0, 0, 0}, @@ -845,8 +933,8 @@ TEST(FairTokenBucket, Weight) {4, 4, 30000, 0, 0, 0}, {5, 10, 50000, 0, 0, 0}}; - index=test_fair_token_bucket(t2_heavy_classes, 5, 120000, 200000, duration_s, 1); - EXPECT_NEAR(index, 1, 0.02); + double t2_heavy_index=test_fair_token_bucket(t2_heavy_classes, 5, 120000, 200000, duration_s, n_replica); + EXPECT_NEAR(t2_heavy_index, 1, 0.02); struct sftb_class t3_heavy_classes[5]={{1, 1, 40000, 0, 0, 0}, {2, 2, 20000, 0, 0, 0}, @@ -854,8 +942,8 @@ TEST(FairTokenBucket, Weight) {4, 4, 20000, 0, 0, 0}, {5, 10, 10000, 0, 0, 0}}; - index=test_fair_token_bucket(t3_heavy_classes, 5, 100000, 200000, duration_s, 1); - EXPECT_NEAR(index, 1, 0.02); + double t3_heavy_index=test_fair_token_bucket(t3_heavy_classes, 5, 100000, 200000, duration_s, n_replica); + EXPECT_NEAR(t3_heavy_index, 1, expected_fairness_index); struct sftb_class all_light_classes[5]={{1, 1, 20000, 0, 0, 0}, {2, 2, 20000, 0, 0, 0}, @@ -863,8 +951,8 @@ TEST(FairTokenBucket, Weight) {4, 4, 20000, 0, 0, 0}, {5, 10, 23000, 0, 0, 0}}; - index=test_fair_token_bucket(all_light_classes, 5, 100000, 200000, duration_s, 1); - EXPECT_NEAR(index, 1, 0.02); + index=test_fair_token_bucket(all_light_classes, 5, 100000, 200000, duration_s, n_replica); + EXPECT_NEAR(index, 1, expected_fairness_index); struct sftb_class light_with_big_player[5]={{1, 1, 10000, 0, 0, 0}, {2, 2, 10000, 0, 0, 0}, @@ -872,10 +960,10 @@ TEST(FairTokenBucket, Weight) {4, 4, 10000, 0, 0, 0}, {5, 10, 60000, 0, 0, 0}}; - index=test_fair_token_bucket(light_with_big_player, 5, 100000, 200000, duration_s, 1); - EXPECT_NEAR(index, 1, 0.02); + double all_light_index=test_fair_token_bucket(light_with_big_player, 5, 100000, 200000, duration_s, n_replica); + EXPECT_NEAR(all_light_index, 1, expected_fairness_index); - struct sftb_class sandvine_active_logic[10]={{1, 5, 30000, 0, 0, 0}, + struct sftb_class sd_al[10]={{1, 5, 30000, 0, 0, 0}, {2, 5, 30000, 0, 0, 0}, {3, 3, 12000, 0, 0, 0}, {4, 3, 12000, 0, 0, 0}, @@ -886,8 +974,8 @@ TEST(FairTokenBucket, Weight) {9, 2, 9000, 0, 0, 0}, {10, 2, 20000, 0, 0, 0}}; - index=test_fair_token_bucket(sandvine_active_logic, 10, 100000, 200000, duration_s, 1); - EXPECT_NEAR(index, 1, 0.02); + double sd_al_index=test_fair_token_bucket(sd_al, 10, 100000, 200000, duration_s, n_replica); + EXPECT_NEAR(sd_al_index, 1, expected_fairness_index); } TEST(FairTokenBucket, Weight5000) { @@ -904,7 +992,7 @@ TEST(FairTokenBucket, Weight5000) very_heavy_classes[i].weight=i%20+1; } double index=test_fair_token_bucket(very_heavy_classes, n_class, CIR, CBS, 40, 1); - EXPECT_NEAR(index, 1, 0.02); + EXPECT_NEAR(index, 1, expected_fairness_index); struct sftb_class slight_heavy_classes[n_class]; memset(slight_heavy_classes, 0, sizeof(slight_heavy_classes)); @@ -915,7 +1003,7 @@ TEST(FairTokenBucket, Weight5000) slight_heavy_classes[i].weight=i%20+1; } index=test_fair_token_bucket(slight_heavy_classes, n_class, CIR, CBS, 40, 1); - EXPECT_NEAR(index, 1, 0.02); + EXPECT_NEAR(index, 1, expected_fairness_index); } struct btb_key diff --git a/CRDT/fair_token_bucket.c b/CRDT/fair_token_bucket.c index 66e3976..d38dcec 100644 --- a/CRDT/fair_token_bucket.c +++ b/CRDT/fair_token_bucket.c @@ -11,7 +11,7 @@ #include <stdint.h> -#define REFILL_INTERVAL_MS 200 +#define PERTUB_INTERVAL_MS 100 struct fair_token_bucket @@ -25,9 +25,13 @@ struct fair_token_bucket struct OC_token_bucket *bucket; /* Local Variables*/ - struct timeval last_refill_time; - long long per_weight_quantum; //updated every REFILL_INTERVAL_MS - long long n_active_key; + struct timeval last_pertub; + long long per_weight_quantum; //updated every PERTUB_INTERVAL_MS + long long total_weight; + long long n_active_key; + long long debug_last_defict; + long long debug_last_allocated; + long long debug_consume_call; }; struct fair_token_bucket *fair_token_bucket_new(uuid_t uuid, struct timeval now, long long CIR, long long CBS, long long divisor) { @@ -41,6 +45,8 @@ struct fair_token_bucket *fair_token_bucket_new(uuid_t uuid, struct timeval now, } ftb->sfq=g_array_new(uuid, ftb->divisor); ftb->bucket=OC_token_bucket_new(uuid, now, CIR, CBS); + OC_token_bucket_set_no_reserve(ftb->bucket); + ftb->per_weight_quantum=CIR; return ftb; } void fair_token_bucket_configure(struct fair_token_bucket *ftb, struct timeval now, long long CIR, long long CBS, long long divisor) @@ -64,13 +70,8 @@ void fair_token_bucket_free(struct fair_token_bucket *ftb) free(ftb); } -static void ftb_refill(struct fair_token_bucket *ftb, struct timeval now) +static void ftb_quantum_estimation(struct fair_token_bucket *ftb, struct timeval now) { - long long delta_time_ms=timeval_delta_ms(ftb->last_refill_time, now); - if(likely(delta_time_ms<REFILL_INTERVAL_MS)) - { - return; - } //Per weight quantum estimation long long total_weight=0, n_active_key=0, count=0; for(int i=0; i<FAIR_TB_WEIGHT_MAX; i++) @@ -81,21 +82,32 @@ static void ftb_refill(struct fair_token_bucket *ftb, struct timeval now) } struct OC_token_bucket_info info; OC_token_bucket_info(ftb->bucket, now, &info); - long long available=info.available; + long long available=info.available; + //available=MAX(available-info.CIR*(2-1)/2, 0); + + ftb->total_weight=total_weight; ftb->per_weight_quantum=available/MAX(1, total_weight); ftb->n_active_key=n_active_key; - - memcpy(&ftb->last_refill_time, &now, sizeof(ftb->last_refill_time)); +} +static void ftb_perturb(struct fair_token_bucket *ftb, struct timeval now) +{ + ftb_quantum_estimation(ftb, now); g_array_reset(ftb->sfq); ftb->perturb_seed++; } - long long fair_token_bucket_consume(struct fair_token_bucket *ftb, struct timeval now, const char *key, size_t keylen, long long weight, enum tb_consume_type cmd, long long tokens) { + ftb->debug_consume_call++; if(weight>FAIR_TB_WEIGHT_MAX || weight<0) return -1; ST_hyperloglog_add(ftb->hll[weight-1], key, keylen, now); - ftb_refill(ftb, now); + + long long delta_time_ms=timeval_delta_ms(ftb->last_pertub, now); + if(unlikely(delta_time_ms>=PERTUB_INTERVAL_MS)) + { + ftb_perturb(ftb, now); + memcpy(&ftb->last_pertub, &now, sizeof(ftb->last_pertub)); + } int sfq_idx=0; sfq_idx=XXH3_64bits_withSeed(key, keylen, ftb->perturb_seed)%ftb->divisor; @@ -103,12 +115,18 @@ long long fair_token_bucket_consume(struct fair_token_bucket *ftb, struct timeva long long deficit_est=0; deficit_est=g_array_get(ftb->sfq, sfq_idx); deficit_est/=MAX(1, ftb->n_active_key/ftb->divisor); + ftb->debug_last_defict=deficit_est; if(tokens + deficit_est > ftb->per_weight_quantum*weight) { + ftb->debug_last_allocated=-1; return 0; } long long allocated_tokens=OC_token_bucket_consume(ftb->bucket, now, cmd, tokens); - if(allocated_tokens) g_array_incrby(ftb->sfq, sfq_idx, allocated_tokens); + if(allocated_tokens) + { + g_array_incrby(ftb->sfq, sfq_idx, allocated_tokens); + } + ftb->debug_last_allocated=allocated_tokens; return allocated_tokens; } @@ -125,6 +143,7 @@ struct ftb_header long long payload_sz; long long divisor; long long pertub_seed; + struct timeval last_pertub; }; size_t fair_token_bucket_serialized_size(const struct fair_token_bucket *ftb) { @@ -147,7 +166,7 @@ void fair_token_bucket_serialize(const struct fair_token_bucket *ftb, char **blo hdr.payload_sz=sz; hdr.divisor=ftb->divisor; hdr.pertub_seed=ftb->perturb_seed; - + memcpy(&hdr.last_pertub, &ftb->last_pertub, sizeof(hdr.last_pertub)); char *buffer=ALLOC(char, sz); memcpy(buffer+offset, &hdr, sizeof(hdr)); offset += sizeof(hdr); @@ -185,6 +204,7 @@ struct fair_token_bucket *fair_token_bucket_deserialize(const char *blob, size_t struct fair_token_bucket *ftb=ALLOC(struct fair_token_bucket, 1); ftb->perturb_seed=hdr.pertub_seed; ftb->divisor=hdr.divisor; + memcpy(&ftb->last_pertub, &hdr.last_pertub, sizeof(ftb->last_pertub)); for(int i=0; i<FAIR_TB_WEIGHT_MAX; i++) { ftb->hll[i]=ST_hyperloglog_deserialize(blob+offset, blob_sz-offset); @@ -199,12 +219,17 @@ struct fair_token_bucket *fair_token_bucket_deserialize(const char *blob, size_t } void fair_token_bucket_merge(struct fair_token_bucket *dst, const struct fair_token_bucket *src) { - dst->perturb_seed=MAX(dst->perturb_seed, src->perturb_seed); dst->divisor=MAX(dst->divisor, src->divisor); for(int i=0; i<FAIR_TB_WEIGHT_MAX; i++) { ST_hyperloglog_merge(dst->hll[i], src->hll[i]); } + if(timercmp(&dst->last_pertub, &src->last_pertub, <))//Last-Write-Wins + { + dst->perturb_seed=src->perturb_seed; + memcpy(&dst->last_pertub, &src->last_pertub, sizeof(dst->last_pertub)); + ftb_quantum_estimation(dst, dst->last_pertub); + } g_array_merge(dst->sfq, src->sfq); OC_token_bucket_merge(dst->bucket, src->bucket); return; diff --git a/CRDT/g_array.c b/CRDT/g_array.c index 2272bfc..f6cd7b3 100644 --- a/CRDT/g_array.c +++ b/CRDT/g_array.c @@ -7,14 +7,13 @@ #define TO_LOCAL_EPOCH(x) (x & ((1ULL << 24) - 1)) struct counter_item { - long long epoch:24; - long long count:40; + long long count; }; struct counter_array { uuid_t replica_id; - long long array_sz; long long sequence; + long long array_sz; struct counter_item *array; UT_hash_handle hh; }; @@ -44,7 +43,7 @@ struct g_array *g_array_new(uuid_t my_id, long long array_sz) struct g_array *ga=ALLOC(struct g_array, 1); uuid_copy(ga->my_id, my_id); ga->array_sz=array_sz; - return ga; + return ga; } void g_array_free(struct g_array *ga) { @@ -57,15 +56,24 @@ void g_array_free(struct g_array *ga) free(ga); return; } - +static void update_global_epoch(struct g_array *ga) +{ + struct counter_array *a=NULL, *tmp=NULL; + ga->epoch=0; + HASH_ITER(hh, ga->hash, a, tmp) + { + // ga->epoch+=a->epoch; + } + return; +} long long g_array_get(const struct g_array *ga, long long idx) { struct counter_array *a=NULL, *tmp=NULL; long long value=0; HASH_ITER(hh, ga->hash, a, tmp) { - if(idx < a->array_sz && a->array[idx].epoch == TO_LOCAL_EPOCH(ga->epoch)) - value += a->array[idx].count; + if(idx < a->array_sz) + value += a->array[idx].count; } return value; } @@ -80,18 +88,25 @@ long long g_array_incrby(struct g_array *ga, long long idx, long long increment) HASH_ADD_KEYPTR(hh, ga->hash, a->replica_id, sizeof(a->replica_id), a); } assert(idx < a->array_sz); - if(a->array[idx].epoch != TO_LOCAL_EPOCH(ga->epoch)) - { - a->array[idx].epoch = TO_LOCAL_EPOCH(ga->epoch); - a->array[idx].count = 0; - } a->array[idx].count += increment; - a->sequence ++; + a->sequence++; return g_array_get(ga, idx); } void g_array_reset(struct g_array *ga) { - ga->epoch++; + struct counter_array *a=NULL; + + HASH_FIND(hh, ga->hash, ga->my_id, sizeof(ga->my_id), a); + if(!a) + { + a=counter_array_new(ga->my_id, ga->array_sz); + HASH_ADD_KEYPTR(hh, ga->hash, a->replica_id, sizeof(a->replica_id), a); + } + for(size_t i=0; i<ga->array_sz; i++) + { + a->array[i].count -= g_array_get(ga, i); + } + a->sequence++; } void g_array_resize(struct g_array *ga, long long new_size) { @@ -99,13 +114,12 @@ void g_array_resize(struct g_array *ga, long long new_size) HASH_FIND(hh, ga->hash, ga->my_id, sizeof(ga->my_id), a); if(a && a->array_sz < new_size) { - a->array=(struct counter_item *)realloc(a->array, sizeof(struct counter_item)*new_size); - a->array_sz=new_size; - a->sequence++; + a->array=(struct counter_item *)realloc(a->array, sizeof(struct counter_item)*new_size); + a->array_sz=new_size; } ga->array_sz=new_size; - return; + return; } size_t g_array_replicas(const struct g_array *ga) { @@ -120,9 +134,9 @@ size_t g_array_serialized_size(const struct g_array *ga) HASH_ITER(hh, ga->hash, item, tmp) { sz += G_ARRAY_ITEM_HEADER_SIZE; - sz += item->array_sz*sizeof(struct counter_item); + sz += item->array_sz*sizeof(struct counter_item); } - return sz; + return sz; } void g_array_serialize(const struct g_array *ga, char **blob, size_t *blob_sz) { @@ -172,43 +186,44 @@ struct g_array * g_array_deserialize(const char *blob, size_t blob_sz) assert(offset<=blob_sz); return ga; } + void g_array_merge(struct g_array *dst, const struct g_array *src) { - struct counter_array *src_item=NULL, *dst_item=NULL, *tmp=NULL; + struct counter_array *src_item=NULL, *dst_item=NULL, *tmp=NULL; long long max_array_sz=0; - HASH_ITER(hh, src->hash, src_item, tmp) - { - HASH_FIND(hh, dst->hash, src_item->replica_id, sizeof(src_item->replica_id), dst_item); - if(!dst_item) - { - dst_item=ALLOC(struct counter_array, 1); + HASH_ITER(hh, src->hash, src_item, tmp) + { + HASH_FIND(hh, dst->hash, src_item->replica_id, sizeof(src_item->replica_id), dst_item); + if(!dst_item) + { + dst_item=ALLOC(struct counter_array, 1); memcpy(dst_item, src_item, G_ARRAY_ITEM_HEADER_SIZE); - dst_item->array=ALLOC(struct counter_item, dst_item->array_sz); - memcpy(dst_item->array, src_item->array, dst_item->array_sz*sizeof(struct counter_item)); + dst_item->array=ALLOC(struct counter_item, dst_item->array_sz); + memcpy(dst_item->array, src_item->array, dst_item->array_sz*sizeof(struct counter_item)); HASH_ADD_KEYPTR(hh, dst->hash, dst_item->replica_id, sizeof(dst_item->replica_id), dst_item); - } + } else { if(src_item->sequence > dst_item->sequence) { - dst_item->array_sz=src_item->array_sz; + memcpy(dst_item, src_item, G_ARRAY_ITEM_HEADER_SIZE); dst_item->array=realloc(dst_item->array, sizeof(struct counter_item)*dst_item->array_sz); memcpy(dst_item->array, src_item->array, sizeof(struct counter_item)*dst_item->array_sz); } } max_array_sz=MAX(src_item->array_sz, max_array_sz); - } - dst->epoch=MAX(dst->epoch, src->epoch); + } + update_global_epoch(dst); if(dst->array_sz<max_array_sz) { g_array_resize(dst, max_array_sz); } - return; + return; } void g_array_merge_blob(struct g_array *ga, const char *blob, size_t blob_sz) { struct g_array *src=g_array_deserialize(blob, blob_sz); - g_array_merge(ga, src); - g_array_free(src); - return; + g_array_merge(ga, src); + g_array_free(src); + return; }
\ No newline at end of file diff --git a/CRDT/oc_token_bucket.c b/CRDT/oc_token_bucket.c index 502596c..1facf43 100644 --- a/CRDT/oc_token_bucket.c +++ b/CRDT/oc_token_bucket.c @@ -6,12 +6,12 @@ #include <assert.h> #include <string.h> - +#define OCTB_REFILL_INTERVAL_MS_DEFAULT 10 struct OC_configuration { long long CIR; //Committed Information Rate long long CBS; //Committed Burst Size - long long refill_duration_ms; + long long refill_interval_ms; struct timeval write_timestamp; }; struct OC_token_bucket @@ -20,6 +20,9 @@ struct OC_token_bucket long long refilled; struct timeval refill_timestamp; struct PN_counter *consumed; + + /* Local variables */ + int no_reserve; }; const size_t OCTB_BLOB_HDR_SIZE= offsetof(struct OC_token_bucket, consumed); struct OC_token_bucket *OC_token_bucket_new(uuid_t my_id, struct timeval now, long long CIR, long long CBS) @@ -30,7 +33,7 @@ struct OC_token_bucket *OC_token_bucket_new(uuid_t my_id, struct timeval now, lo memcpy(&bucket->cfg.write_timestamp, &now, sizeof(bucket->cfg.write_timestamp)); bucket->cfg.CIR=CIR; bucket->cfg.CBS=CBS; - bucket->cfg.refill_duration_ms=10; + bucket->cfg.refill_interval_ms=OCTB_REFILL_INTERVAL_MS_DEFAULT; memcpy(&bucket->refill_timestamp, &now, sizeof(bucket->cfg.write_timestamp)); bucket->refilled=bucket->cfg.CBS; return bucket; @@ -48,7 +51,10 @@ void OC_token_bucket_configure(struct OC_token_bucket *bucket, struct timeval no if(CIR>=0) bucket->cfg.CIR=CIR; if(CBS>=0) bucket->cfg.CBS=CBS; } - +void OC_token_bucket_set_no_reserve(struct OC_token_bucket *bucket) +{ + bucket->no_reserve=1; +} long long OC_token_bucket_consume(struct OC_token_bucket *bucket, struct timeval now, enum tb_consume_type cmd, long long tokens) { @@ -60,7 +66,7 @@ long long OC_token_bucket_consume(struct OC_token_bucket *bucket, struct timeval long long new_refilled=0; long long available=tb_available(bucket->cfg.CIR, bucket->cfg.CBS, consumed, - refilled, delta_time_ms, bucket->cfg.refill_duration_ms, &new_refilled); + refilled, delta_time_ms, bucket->cfg.refill_interval_ms, &new_refilled); int infinite_flag=0; if(bucket->cfg.CBS==0 && bucket->cfg.CIR==0) @@ -69,7 +75,11 @@ long long OC_token_bucket_consume(struct OC_token_bucket *bucket, struct timeval new_refilled += tokens; } size_t n_replica=PN_counter_replica_num(bucket->consumed); - + if(refilled!=new_refilled) + { + bucket->refilled=new_refilled; + memcpy(&bucket->refill_timestamp, &now, sizeof(bucket->refill_timestamp)); + } long long allocated=0; if(infinite_flag) { @@ -77,19 +87,15 @@ long long OC_token_bucket_consume(struct OC_token_bucket *bucket, struct timeval } else { - allocated=tb_consume(bucket->cfg.CIR, available, n_replica, cmd, tokens); + //allocated=tb_consume_quantum_based(available, bucket->deficit, bucket->quantum, cmd, tokens); + if(bucket->no_reserve) n_replica=1; + allocated=tb_consume_reserve_based(bucket->cfg.CIR, available, n_replica, cmd, tokens); } if(allocated>0) { PN_counter_incrby(bucket->consumed, allocated); } - if(refilled!=new_refilled) - { - bucket->refilled=new_refilled; - memcpy(&bucket->refill_timestamp, &now, sizeof(bucket->refill_timestamp)); - } - assert(allocated>=0); - + assert(allocated>=0); return allocated; } @@ -99,10 +105,11 @@ void OC_token_bucket_info(struct OC_token_bucket *bucket, struct timeval now, st info->CIR=bucket->cfg.CIR; info->CBS=bucket->cfg.CBS; - info->refill_duration_ms=bucket->cfg.refill_duration_ms; + info->refill_interval_ms=bucket->cfg.refill_interval_ms; info->consumed=PN_counter_get(bucket->consumed); info->available=tb_available(bucket->cfg.CIR, bucket->cfg.CBS, info->consumed, - bucket->refilled, delta_time_ms, bucket->cfg.refill_duration_ms, &info->refilled); + bucket->refilled, delta_time_ms, bucket->cfg.refill_interval_ms, &info->refilled); + info->number_of_replica=PN_counter_replica_num(bucket->consumed); return; } size_t OC_token_bucket_serialized_size(const struct OC_token_bucket *bucket) diff --git a/CRDT/oc_token_bucket.h b/CRDT/oc_token_bucket.h index 172a850..26a0a32 100644 --- a/CRDT/oc_token_bucket.h +++ b/CRDT/oc_token_bucket.h @@ -23,15 +23,15 @@ struct OC_token_bucket_info { long long CIR; long long CBS; - long long refill_duration_ms; + long long refill_interval_ms; long long consumed; long long refilled; long long available; - long long number_of_consumers; + long long number_of_replica; }; void OC_token_bucket_info(struct OC_token_bucket *bucket, struct timeval now, struct OC_token_bucket_info *info); void OC_token_bucket_free(struct OC_token_bucket *bucket); - +void OC_token_bucket_set_no_reserve(struct OC_token_bucket *bucket); long long OC_token_bucket_consume(struct OC_token_bucket *bucket, struct timeval now, enum tb_consume_type cmd, long long tokens); diff --git a/CRDT/token_bucket_common.c b/CRDT/token_bucket_common.c index 895b392..52d25c5 100644 --- a/CRDT/token_bucket_common.c +++ b/CRDT/token_bucket_common.c @@ -30,7 +30,7 @@ long long tb_available(long long CIR, long long CBS, long long consumed, long lo } return MAX(*new_refilled-consumed, 0); } -long long tb_consume(long long CIR, long long available, size_t n_replica, enum tb_consume_type cmd, long long tokens) +long long tb_consume_reserve_based(long long CIR, long long available, size_t n_replica, enum tb_consume_type cmd, long long tokens) { long long reserved=CIR*(n_replica-1)/n_replica; long long local_available=MAX(available-reserved, 0); @@ -44,7 +44,7 @@ long long tb_consume(long long CIR, long long available, size_t n_replica, enum allocated=tokens; break; case TB_CONSUME_FLEXIBLE: - allocated=MIN(tokens, local_available);; + allocated=MIN(tokens, local_available); break; case TB_CONSUME_NORMAL: allocated=(tokens<=local_available) ? tokens:0; @@ -54,4 +54,29 @@ long long tb_consume(long long CIR, long long available, size_t n_replica, enum break; } return allocated; +} +long long tb_consume_quantum_based(long long available, long long deficit, long long quantum, enum tb_consume_type cmd, long long tokens) +{ + long long local_available=MIN(available, quantum-deficit); + + long long allocated=0; + switch(cmd) + { + case TB_CONSUME_AS_MUCH_AS_POSSIBLE: + allocated=available; + break; + case TB_CONSUME_FORCE: + allocated=tokens; + break; + case TB_CONSUME_FLEXIBLE: + allocated=MIN(tokens, local_available); + break; + case TB_CONSUME_NORMAL: + allocated=(tokens<=local_available) ? tokens:0; + break; + default: + assert(0); + break; + } + return allocated; }
\ No newline at end of file diff --git a/CRDT/token_bucket_common.h b/CRDT/token_bucket_common.h index a3f737f..7ad7044 100644 --- a/CRDT/token_bucket_common.h +++ b/CRDT/token_bucket_common.h @@ -12,10 +12,11 @@ enum tb_consume_type TB_CONSUME_NORMAL, TB_CONSUME_FORCE, TB_CONSUME_FLEXIBLE, - TB_CONSUME_AS_MUCH_AS_POSSIBLE, + TB_CONSUME_AS_MUCH_AS_POSSIBLE }; long long tb_available(long long CIR, long long CBS, long long consumed, long long refilled, long long delta_ms, long long refill_interval_ms, long long *new_refilled); -long long tb_consume(long long CIR, long long available, size_t n_replica, enum tb_consume_type cmd, long long tokens); +long long tb_consume_quantum_based(long long available, long long deficit, long long quantum, enum tb_consume_type cmd, long long tokens); +long long tb_consume_reserve_based(long long CIR, long long available, size_t n_replica, enum tb_consume_type cmd, long long tokens); #ifdef __cplusplus } #endif
\ No newline at end of file diff --git a/docs/design.md b/docs/design.md index c3a3f0f..e191425 100644 --- a/docs/design.md +++ b/docs/design.md @@ -240,6 +240,7 @@ For a swarmkv node, it has three modules. Each module has its own internal data - Keyspace Module stores replica addresses of keys. It's a key-route hash table which has 16384 slots. Each slot is protected by a mutex lock. Lock contention happens when wrting on keys of same slot. - Monitor Module collects runtime metrics for diagnostic. It's lock-free by taking advantage of [HdrHistogram_c](https://github.com/HdrHistogram/HdrHistogram_c)'s interval recorder. +[Todo] Lock free API powered by completion queue. ## Source code layout The source files are organized as follows: |
