#include "oc_token_bucket.h" #include "fair_token_bucket.h" #include "bulk_token_bucket.h" #include "crdt_utils.h" #include #include //usleep #include #include void OC_token_bucket_sync(struct OC_token_bucket *list[], size_t n) { char *blob=NULL; size_t blob_sz=0; for(size_t i=0; i= eight_replica_num && scope_flag) { request_size = (long long)floor((long double)standard * 1.6) + rand_sd10; } else { request_size = (long long)floor((long double)standard * 1.6) - rand_sd10; } break; case HEAVY_TWO_EIGHT_TYPE: if (index < eight_replica_num && scope_flag) { request_size = (long long)floor((long double)standard * 0.6) + rand_sd10; } else if (index < eight_replica_num && !scope_flag) { request_size = (long long)floor((long double)standard * 0.6) - rand_sd10; } else if (index >= eight_replica_num && scope_flag) { request_size = (long long)floor((long double)standard * 4.8) + rand_sd10; } else { request_size = (long long)floor((long double)standard * 4.8) - rand_sd10; } break; case HEAVY_UNIFORM_EXTREME_TYPE: request_size = (long long)floor((long double)standard * 2); if (scope_flag) { request_size += rand_sd10; } else { request_size -= rand_sd10; } break; default: break; } return request_size; } void traffic_distribution(traffic_type type) { const int REPLICA_NUMBER=3; struct OC_token_bucket *buckets[REPLICA_NUMBER]; size_t i = 0, j = 0; long long CIR = 1*1024*1024; long long CBS = 2*1024*1024; uuid_t uuid; struct timeval start; gettimeofday(&start, NULL); for (i = 0; i < REPLICA_NUMBER; i++) { uuid_generate(uuid); buckets[i] = OC_token_bucket_new(uuid, start, CIR, CBS); } long long tokens = 0, flexible_tokens = 0; long long consumed = 0, requested = 0, upper_limit = 0; long long mimic_duration_us = (long long)100*1000*1000; long long step_us = 100; struct timeval step, now; memcpy(&now, &start, sizeof(now)); step.tv_sec = 0; step.tv_usec = (suseconds_t)step_us; for (i = 0; (long long)i < mimic_duration_us / step_us; i++) { j = i % 3; // sequence selection timeradd(&now, &step, &now); tokens = get_request_tokens(j, type, step_us, CIR, CBS); flexible_tokens = OC_token_bucket_consume(buckets[j], now, TB_CONSUME_FLEXIBLE, tokens); requested += tokens; consumed += flexible_tokens; if(i%100==0) { OC_token_bucket_sync(buckets, REPLICA_NUMBER); } } upper_limit = CBS + CIR * timeval_delta_ms(start, now) / 1000; struct OC_token_bucket_info info; OC_token_bucket_info(buckets[0], now, &info); EXPECT_LE(consumed, requested); double accuracy = (double)consumed / MIN(upper_limit, requested); EXPECT_NEAR(accuracy, 1, 0.02); printf("accuracy:%f, upper_limit:%lld, refilled:%lld, requested:%lld, consumed:%lld\n", accuracy, upper_limit, info.refilled, requested, consumed); for(i = 0; i < REPLICA_NUMBER; i++) { OC_token_bucket_free(buckets[i]); } } TEST(OCTokenBucket, TrafficTypeConsumer) { traffic_distribution(LIGHT_UNIFORM_TYPE); traffic_distribution(LIGHT_TWO_EIGHT_TYPE); traffic_distribution(HEAVY_TWO_EIGHT_TYPE); traffic_distribution(HEAVY_UNIFORM_EXTREME_TYPE); } TEST(OCTokenBucket, Basic) { uuid_t uuid; uuid_generate(uuid); struct OC_token_bucket *bucket=NULL; long long CIR=100; long long CBS=200; struct timeval now; gettimeofday(&now, NULL); bucket=OC_token_bucket_new(uuid, now, CIR, CBS); long long tokens=0; tokens=OC_token_bucket_consume(bucket, now, TB_CONSUME_NORMAL, 140); EXPECT_EQ(tokens, 140); tokens=OC_token_bucket_consume(bucket, now, TB_CONSUME_NORMAL, 61); EXPECT_EQ(tokens, 0); tokens=OC_token_bucket_consume(bucket, now, TB_CONSUME_NORMAL, 60); EXPECT_EQ(tokens, 60); now.tv_sec++; tokens=OC_token_bucket_consume(bucket, now, TB_CONSUME_NORMAL, 90); EXPECT_EQ(tokens, 90); struct OC_token_bucket_info info; OC_token_bucket_info(bucket, now, &info); EXPECT_GE(info.available, 10); // printf("avail=%lld\n", tokens); tokens=OC_token_bucket_consume(bucket, now, TB_CONSUME_NORMAL, 10); EXPECT_EQ(tokens, 10); OC_token_bucket_free(bucket); } TEST(OCTokenBucket, Serialize) { uuid_t uuid; int n_replica=4; long long CIR=100; long long CBS=200; struct timeval now; gettimeofday(&now, NULL); struct OC_token_bucket *b[n_replica], *b1=NULL; for(int i=0; imimic_duration_us/step_us/3 && (long long)i<= 2*mimic_duration_us/step_us/3) { continue; } //if(i%100==0) { OC_token_bucket_sync(buckets, REPLICA_NUMBER); } } upper_limit=CBS+(CIR*timeval_delta_ms(start, now)/1000)*2/3 + (CIR*timeval_delta_ms(start, now)/1000)*REPLICA_NUMBER/3; struct OC_token_bucket_info info; OC_token_bucket_info(buckets[0], now, &info); refilled = info.refilled; EXPECT_NEAR((double)consumed/MIN(refilled, requested), 1, 0.01); EXPECT_LE(consumed, requested); double accuracy=(double)consumed/MIN(upper_limit, requested); EXPECT_NEAR(accuracy, 1, 0.01); for(i=0; idemand_tokens-rb->demand_tokens); } double max_min_fairness_index(long long available_tokens, struct sftb_class * classes, size_t n_class) { qsort(classes, n_class, sizeof(struct sftb_class), cmp_sftb_class); long long total_weight=0; for(size_t i=0; i0 ) { long long share=left_tokens/left_weight; for(size_t i=0; i 0.9) continue; printf("%lld\t%lld\t%lld\t%lld\t%lld\r\n", classes[i].class_id, classes[i].weight, classes[i].demand_tokens/duration_s, classes[i].allocated_tokens/duration_s, classes[i].ideal_tokens/duration_s); } printf("CIR %lld fairness index %f\r\n", CIR, index); } for(int i=0; iduration_s*mycase->CIR+mycase->CBS; struct bulk_token_bucket *btb[mycase->n_replica]; int n_replica=mycase->n_replica; int key_num=mycase->key_num; for(int i=0; iCIR, mycase->CBS, mycase->bucket_num); } struct btb_key bk[key_num]; for(int i=0; iCIR/2+(i*mycase->CIR)/key_num; bk[i].allocated_tokens=0; if(bk[i].request_CIRCIR) { bk[i].ideal_tokens=bk[i].request_CIR*mycase->duration_s; } else { bk[i].ideal_tokens=max_tokens; } } int sync_interval_ms=200; for(int i=0; iduration_s*(1000*1000/step_us); i++) { timeradd(&now, &step, &now); for(int j=0; jn_replica; bk[j].allocated_tokens+=bulk_token_bucket_consume(btb[r], now, (char *)&(bk[j].key), sizeof(bk[j].key), TB_CONSUME_FLEXIBLE, bk[j].request_CIR*step_us/(1000*1000)); } if(0==i%(sync_interval_ms*1000/step_us)) { btb_sync(btb, mycase->n_replica); } } double index=0, ratio=0; long long more=0, less=0; for(int i=0; i 1.02) more++; if (ratio <0.98) less++; if(ratio>1.02 || ratio < 0.98) { //printf("%f\n", ratio); } } index=1-sqrt(index/key_num); struct bulk_token_bucket_info info; bulk_token_bucket_info(btb[0], now, &info); mycase->index=index; mycase->estimate_keys=info.estimate_keys; mycase->more=more; mycase->less=less; mycase->collision_rate=info.collision_rate; for(int i=0; i0.95 &&(double)allocated[i]/upper_limit <1.05) success++; bulk_token_bucket_free(btb[i]); } EXPECT_EQ(success, n_replica); } TEST(BulkTokenBucket, RareCollision) { int n_case=7; struct btb_case test[n_case]; for(int i=0; i