#include "oc_token_bucket.h" #include "fair_token_bucket.h" #include "bulk_token_bucket.h" #include "crdt_utils.h" #include #include //usleep #include #include void OC_token_bucket_sync(struct OC_token_bucket *list[], size_t n) { char *blob=NULL; size_t blob_sz=0; for(size_t i=0; i= eight_replica_num && scope_flag) { request_size = (long long)floor((long double)standard * 1.6) + rand_sd10; } else { request_size = (long long)floor((long double)standard * 1.6) - rand_sd10; } break; case HEAVY_TWO_EIGHT_TYPE: if (index < eight_replica_num && scope_flag) { request_size = (long long)floor((long double)standard * 0.6) + rand_sd10; } else if (index < eight_replica_num && !scope_flag) { request_size = (long long)floor((long double)standard * 0.6) - rand_sd10; } else if (index >= eight_replica_num && scope_flag) { request_size = (long long)floor((long double)standard * 4.8) + rand_sd10; } else { request_size = (long long)floor((long double)standard * 4.8) - rand_sd10; } break; case HEAVY_UNIFORM_EXTREME_TYPE: request_size = (long long)floor((long double)standard * 2); if (scope_flag) { request_size += rand_sd10; } else { request_size -= rand_sd10; } break; default: break; } return request_size; } void traffic_distribution(traffic_type type) { const int REPLICA_NUMBER=3; struct OC_token_bucket *buckets[REPLICA_NUMBER]; size_t i = 0, j = 0; long long CIR = 1*1024*1024; long long CBS = 2*1024*1024; uuid_t uuid; struct timeval start; gettimeofday(&start, NULL); for (i = 0; i < REPLICA_NUMBER; i++) { uuid_generate(uuid); buckets[i] = OC_token_bucket_new(uuid, start, CIR, 1, CBS); } long long tokens = 0, flexible_tokens = 0; long long consumed = 0, requested = 0, upper_limit = 0; long long mimic_duration_us = (long long)100*1000*1000; long long step_us = 100; struct timeval step, now; now = start; step.tv_sec = 0; step.tv_usec = (suseconds_t)step_us; for (i = 0; (long long)i < mimic_duration_us / step_us; i++) { j = i % 3; // sequence selection timeradd(&now, &step, &now); tokens = get_request_tokens(j, type, step_us, CIR, CBS); flexible_tokens = OC_token_bucket_consume(buckets[j], now, TB_CONSUME_FLEXIBLE, tokens); requested += tokens; consumed += flexible_tokens; if(i%100==0) { OC_token_bucket_sync(buckets, REPLICA_NUMBER); } } upper_limit = CBS + CIR * timeval_delta_ms(start, now) / 1000; struct OC_token_bucket_info info; OC_token_bucket_info(buckets[0], now, &info); EXPECT_LE(consumed, requested); double accuracy = (double)consumed / MIN(upper_limit, requested); EXPECT_NEAR(accuracy, 1, 0.02); printf("accuracy:%f, upper_limit:%lld, refilled:%lld, requested:%lld, consumed:%lld\n", accuracy, upper_limit, info.refilled, requested, consumed); for(i = 0; i < REPLICA_NUMBER; i++) { OC_token_bucket_free(buckets[i]); } } TEST(OCTokenBucket, TrafficTypeConsumer) { traffic_distribution(LIGHT_UNIFORM_TYPE); traffic_distribution(LIGHT_TWO_EIGHT_TYPE); traffic_distribution(HEAVY_TWO_EIGHT_TYPE); traffic_distribution(HEAVY_UNIFORM_EXTREME_TYPE); } TEST(OCTokenBucket, Basic) { uuid_t uuid; uuid_generate(uuid); struct OC_token_bucket *bucket=NULL; long long CIR=100; long long CBS=200; struct timeval now; gettimeofday(&now, NULL); bucket=OC_token_bucket_new(uuid, now, CIR, 1, CBS); long long tokens=0; tokens=OC_token_bucket_consume(bucket, now, TB_CONSUME_NORMAL, 140); EXPECT_EQ(tokens, 140); tokens=OC_token_bucket_consume(bucket, now, TB_CONSUME_NORMAL, 61); EXPECT_EQ(tokens, 0); tokens=OC_token_bucket_consume(bucket, now, TB_CONSUME_NORMAL, 60); EXPECT_EQ(tokens, 60); now.tv_sec++; tokens=OC_token_bucket_consume(bucket, now, TB_CONSUME_NORMAL, 90); EXPECT_EQ(tokens, 90); struct OC_token_bucket_info info; OC_token_bucket_info(bucket, now, &info); EXPECT_GE(info.available, 10); // printf("avail=%lld\n", tokens); tokens=OC_token_bucket_consume(bucket, now, TB_CONSUME_NORMAL, 10); EXPECT_EQ(tokens, 10); OC_token_bucket_free(bucket); } TEST(OCTokenBucket, Period) { uuid_t uuid; uuid_generate(uuid); struct OC_token_bucket *bucket=NULL; long long rate=100; long long capacity=40; long long period=300; struct timeval now; gettimeofday(&now, NULL); bucket=OC_token_bucket_new(uuid, now, rate, period, capacity); long long tokens=0; tokens=OC_token_bucket_consume(bucket, now, TB_CONSUME_NORMAL, capacity); EXPECT_EQ(tokens, capacity); now.tv_sec+=period/10; tokens=OC_token_bucket_consume(bucket, now, TB_CONSUME_NORMAL, 1+rate/10); EXPECT_EQ(tokens, 0); tokens=OC_token_bucket_consume(bucket, now, TB_CONSUME_NORMAL, rate/10); EXPECT_EQ(tokens, rate/10); OC_token_bucket_free(bucket); } TEST(OCTokenBucket, Serialize) { uuid_t uuid; int n_replica=4; long long CIR=100; long long CBS=200; struct timeval now; gettimeofday(&now, NULL); struct OC_token_bucket *b[n_replica], *b1=NULL; for(int i=0; iupper_limit?upper_limit-allocated:0)); double allowed_error=(double)1/replica_num/mimic_duration_s; EXPECT_NEAR((double)allocated/MIN(refilled, requested), 1, MAX(allowed_error, 0.03)); EXPECT_LE(allocated, requested); double accuracy=(double)allocated/MIN(upper_limit, requested); // double real_saturation=(double)(requested)/(upper_limit); // EXPECT_NEAR(real_saturation, (double)saturation_percent/100, 0.1); for(i=0; imimic_duration_us/step_us/3 && (long long)i<= 2*mimic_duration_us/step_us/3) { continue; } //if(i%100==0) { OC_token_bucket_sync(buckets, REPLICA_NUMBER); } } upper_limit=CBS+(CIR*timeval_delta_ms(start, now)/1000)*2/3 + (CIR*timeval_delta_ms(start, now)/1000)*REPLICA_NUMBER/3; struct OC_token_bucket_info info; OC_token_bucket_info(buckets[0], now, &info); refilled = info.refilled; EXPECT_NEAR((double)consumed/MIN(refilled, requested), 1, 0.01); EXPECT_LE(consumed, requested); double accuracy=(double)consumed/MIN(upper_limit, requested); EXPECT_NEAR(accuracy, 1, 0.01); for(i=0; idemanded_tokens-rb->demanded_tokens); } double max_min_fairness_index(long long available_tokens, struct ftb_class * classes, size_t n_class) { qsort(classes, n_class, sizeof(struct ftb_class), cmp_ftb_member); long long total_weight=0; for(size_t i=0; i0 ) { long long share=left_tokens/left_weight; for(size_t i=0; in_replica=1; mycase->divisor=8192; mycase->perturb_interval_ms=10; mycase->period=1; mycase->print=1; mycase->sync_interval_ms=100; } double expected_fairness_index=0.04; double test_fair_token_bucket(struct ftb_class *classes, size_t n_class, struct ftb_case *mycase) { uuid_t uuid; srand(171); for(size_t i=0; in_replica; struct fair_token_bucket *ftb[n_replica]; for(int i=0; irate, mycase->period, mycase->capacity, mycase->divisor); fair_token_bucket_set_pertub_interval(ftb[i], mycase->perturb_interval_ms); } int sync_interval_ms=mycase->sync_interval_ms; long long got=0; int r=0; struct timeval last_sync=now; while(timeval_delta_ms(start, now)duration_s*1000) { timeradd(&now, &step, &now); int k=random()%n_class; for(size_t j=0; jrandomization && idx==(int)(now.tv_sec%n_class)) { continue; } r=idx%n_replica;// each class sticks to one replica. //r=0; classes[idx].next_demand += (double)classes[idx].rate*step_us/(1000*1000); if(classes[idx].next_demand < 1) { continue; } long long this_demand = (long long)floor(classes[idx].next_demand); classes[idx].next_demand -= this_demand; assert(classes[idx].next_demand >= 0); got=fair_token_bucket_consume(ftb[r], now, (const char*) &(classes[idx].class_id), sizeof(classes[idx].class_id), classes[idx].weight, TB_CONSUME_NORMAL, this_demand); classes[idx].allocated_tokens += got; classes[idx].demanded_tokens += this_demand; } if(timeval_delta_ms(last_sync, now) > mycase->sync_interval_ms) { ftb_sync(ftb, n_replica); last_sync=now; //r=i%n_replica; } } long long available_tokens = mycase->rate*mycase->duration_s/mycase->period + mycase->capacity; double index=max_min_fairness_index(available_tokens, classes, n_class); if(mycase->print || index < 0.9) { printf("class\tweight\tdemand\tallocated\tideal\r\n"); for(size_t i=0; i 0.9) { } //continue; printf("%lld\t%lld\t%lld\t%lld\t%lld\r\n", classes[i].class_id, classes[i].weight, classes[i].demanded_tokens/mycase->duration_s, classes[i].allocated_tokens/mycase->duration_s, classes[i].ideal_tokens/mycase->duration_s); } printf("Replica %d, Perturb %lld ms, sync interval %d ms, fairness index %f\r\n", n_replica, mycase->perturb_interval_ms, sync_interval_ms, index); } for(int i=0; iduration_s*mycase->rate/mycase->period+mycase->capacity; struct bulk_token_bucket *btb[mycase->n_replica]; int n_replica=mycase->n_replica; int n_key=mycase->n_key; for(int i=0; irate, mycase->period, mycase->capacity, mycase->bucket_num); } struct btb_key bk[n_key]; memset(bk, 0, sizeof(bk)); long long key_base = 2192381923; for(int i=0; irate/mycase->period/2+((i+1)*mycase->rate); bk[i].allocated_tokens=0; if(bk[i].request_CIRrate/mycase->period) { bk[i].ideal_tokens=bk[i].request_CIR*mycase->duration_s; } else { bk[i].ideal_tokens=max_tokens; } } int sync_interval_ms=40; struct timeval last_sync=now; for(int i=0; iduration_s*(1000*1000/step_us); i++) { for(int j=0; jn_replica; int request = bk[j].request_CIR*step_us/(1000*1000); bk[j].allocated_tokens += bulk_token_bucket_consume(btb[r], now, (char *)&(bk[j].key), sizeof(bk[j].key), TB_CONSUME_FLEXIBLE, request); bk[j].requested_tokens += request; bk[j].request_cnt++; } if(timeval_delta_ms(last_sync, now) > sync_interval_ms) { btb_sync(btb, mycase->n_replica); last_sync=now; } timeradd(&now, &step, &now); } double index=0, ratio=0; long long more=0, less=0; for(int i=0; i 1.02) more++; if (ratio <0.98) less++; if(ratio>1.02 || ratio < 0.98) { //printf("%f\n", ratio); } } index=1-sqrt(index/n_key); EXPECT_GE(index, 0.9); struct bulk_token_bucket_info info; bulk_token_bucket_info(btb[0], now, &info); EXPECT_EQ(info.active_bucket_number, n_key); mycase->bucket_num=info.bucket_number; mycase->index=index; mycase->more=more; mycase->less=less; for(int i=0; i sync_interval_ms) { btb_sync(btb, n_replica); last_sync=now; } } upper_limit=CIR*timeval_delta_ms(start, now)/1000+CBS; struct bulk_token_bucket_info info; bulk_token_bucket_info(btb[0], now, &info); EXPECT_EQ(info.active_bucket_number, n_key); EXPECT_EQ(info.max_replicas, n_replica); struct bulk_token_bucket_key_info key_info; bulk_token_bucket_query(btb[1], now, key, strlen(key), &key_info); EXPECT_EQ(key_info.number_of_replica, 2); int success=0; for(int i=0; i0.95 && (double)allocated[i]/upper_limit <1.05) success++; } EXPECT_EQ(success, n_key); for(int i=0; i 1.05) more++; else success++; } index=1-sqrt(index/n_key); EXPECT_GE(index, 0.95); EXPECT_EQ(success, n_key); EXPECT_EQ(less, 0); EXPECT_EQ(more, 0); for(int i=0; i100) { last_compare=now; long long max_tokens = timeval_delta_ms(start, now)*rate/period/1000+capacity; if(allocated>max_tokens) { printf("allocated = %lld, max_tokens = %lld, elapse = %ld \r\n", allocated, max_tokens, timeval_delta_ms(start, now)/1000); } EXPECT_LE(allocated, max_tokens); } } EXPECT_LE(allocated, max_tokens); bulk_token_bucket_free(btb); } TEST(BulkTokenBucket, RetireBucket) { long long capacity=1024*1024*5; long long rate=capacity; long long period=1; long long duration_s=10; long long bucket_num=128; long long n_key=128; uuid_t uuid; struct timeval start, step, now, last_compare; int step_us=1*1000; gettimeofday(&start, NULL); last_compare=now=start; step.tv_sec=0; step.tv_usec=(suseconds_t)step_us; uuid_generate(uuid); struct bulk_token_bucket *btb=bulk_token_bucket_new(uuid, now, rate, period, capacity, bucket_num); while(timeval_delta_ms(start, now)