diff options
| author | Zheng Chao <[email protected]> | 2023-03-11 14:51:20 +0800 |
|---|---|---|
| committer | Zheng Chao <[email protected]> | 2023-03-11 14:51:20 +0800 |
| commit | cff79ab47cc7c4da9d5773aeadb833ca252b14b5 (patch) | |
| tree | 0dbb75b1fcb8c48bd06ea5bdbc30c2e400fa67ae | |
| parent | c0cbee9eaacb24762d307f0b4157ffaed6932a16 (diff) | |
:bug: Fix inaccuarte `TINFO` when a token bucket has a zero CIR and zero CBS.
| -rw-r--r-- | CRDT/crdt_gtest.cpp | 53 | ||||
| -rw-r--r-- | CRDT/oc_token_bucket.c | 21 | ||||
| -rw-r--r-- | docs/commands.md | 2 | ||||
| -rw-r--r-- | test/swarmkv_gtest.cpp | 18 |
4 files changed, 85 insertions, 9 deletions
diff --git a/CRDT/crdt_gtest.cpp b/CRDT/crdt_gtest.cpp index 97dcab6..3166d3c 100644 --- a/CRDT/crdt_gtest.cpp +++ b/CRDT/crdt_gtest.cpp @@ -184,14 +184,65 @@ TEST(OCTokenBucket, Basic) EXPECT_EQ(tokens, 10); OC_token_bucket_free(bucket); + +} +TEST(OCTokenBucket, Boundary) +{ + uuid_t uuid; + uuid_generate(uuid); + + struct timeval now; gettimeofday(&now, NULL); - bucket=OC_token_bucket_new(uuid, now, 0, 0); + struct OC_token_bucket *bucket=NULL; + long long tokens=0, consumed=0; + + //Zero CIR + bucket=OC_token_bucket_new(uuid, now, 0, 1000); + tokens=OC_token_bucket_control(bucket, now, OCTB_CMD_CONSUME_NORMAL, 1000); + EXPECT_EQ(tokens, 1000); + + now.tv_sec++; + tokens=OC_token_bucket_control(bucket, now, OCTB_CMD_CONSUME_NORMAL, 1); + EXPECT_EQ(tokens, 0); + OC_token_bucket_free(bucket); + //Zero CBS + bucket=OC_token_bucket_new(uuid, now, 1000, 0); + tokens=OC_token_bucket_control(bucket, now, OCTB_CMD_CONSUME_NORMAL, 1); + EXPECT_EQ(tokens, 0); + OC_token_bucket_free(bucket); + + //Infinite Tokens + bucket=OC_token_bucket_new(uuid, now, 0, 0); tokens=OC_token_bucket_control(bucket, now, OCTB_CMD_CONSUME_NORMAL, 140); + consumed+=tokens; EXPECT_EQ(tokens, 140); + tokens=OC_token_bucket_control(bucket, now, OCTB_CMD_CONSUME_NORMAL, 65535); + consumed+=tokens; EXPECT_EQ(tokens, 65535); + + tokens=OC_token_bucket_control(bucket, now, OCTB_CMD_READ_CONSUEMD, 0); + EXPECT_EQ(tokens, consumed); + + tokens=OC_token_bucket_control(bucket, now, OCTB_CMD_READ_REFILLED, 0); + EXPECT_EQ(tokens, consumed); + + now.tv_sec++; + OC_token_bucket_configure(bucket, now, 100, 500, 10); + + now.tv_sec+=500/100; + tokens=OC_token_bucket_control(bucket, now, OCTB_CMD_CONSUME_NORMAL, 500); + consumed+=tokens; + EXPECT_EQ(tokens, 500); + + tokens=OC_token_bucket_control(bucket, now, OCTB_CMD_READ_CONSUEMD, 0); + EXPECT_EQ(tokens, consumed); + + tokens=OC_token_bucket_control(bucket, now, OCTB_CMD_READ_REFILLED, 0); + EXPECT_EQ(tokens, consumed); + OC_token_bucket_free(bucket); } TEST(OCTokenBucket, Merge) diff --git a/CRDT/oc_token_bucket.c b/CRDT/oc_token_bucket.c index f7285aa..66af171 100644 --- a/CRDT/oc_token_bucket.c +++ b/CRDT/oc_token_bucket.c @@ -59,10 +59,12 @@ long long OC_token_bucket_control(struct OC_token_bucket *bucket, struct timeval assert(refilled>=0); assert(consumed>=0); int refill_flag=0; - + int infinite_flag=0; if(bucket->cfg.CBS==0 && bucket->cfg.CIR==0) { - cmd=OCTB_CMD_CONSUME_FORCE; + infinite_flag=1; + refilled+=tokens; + refill_flag=1; } to_add=bucket->cfg.CIR*delta_time_ms/1000; @@ -93,16 +95,23 @@ long long OC_token_bucket_control(struct OC_token_bucket *bucket, struct timeval switch(cmd) { case OCTB_CMD_CONSUME_AS_MUCH_AS_POSSIBLE: - assigned=global_available; + assigned=infinite_flag?tokens:global_available; break; case OCTB_CMD_CONSUME_FORCE: assigned=tokens; break; - case OCTB_CMD_CONSUME_FLEXIBLE: - assigned=MIN(tokens, local_available); + case OCTB_CMD_CONSUME_FLEXIBLE: + assigned=infinite_flag?tokens:MIN(tokens, local_available);; break; case OCTB_CMD_CONSUME_NORMAL: - assigned=(tokens<=local_available) ? tokens:0; + if(infinite_flag) + { + assigned=tokens; + } + else + { + assigned=(tokens<=local_available) ? tokens:0; + } break; case OCTB_CMD_READ_AVAILABLE: return global_available; diff --git a/docs/commands.md b/docs/commands.md index b37479f..b619dd4 100644 --- a/docs/commands.md +++ b/docs/commands.md @@ -375,7 +375,7 @@ Syntax TCFG key capacity rate ``` -Config a token bucket of `key`, which has a fixed `capacity` bucket and has tokens are added at a fixed `rate`. If `key` does not exist, a new key holding a token bucket is created. +Config a token bucket of `key`, which has a fixed `capacity` bucket and has tokens are added at a fixed `rate`. If `key` does not exist, a new key holding a token bucket is created. If both capacity and rate are 0, the token bucket has infinite tokens. Return diff --git a/test/swarmkv_gtest.cpp b/test/swarmkv_gtest.cpp index 1213d98..042f1a0 100644 --- a/test/swarmkv_gtest.cpp +++ b/test/swarmkv_gtest.cpp @@ -324,7 +324,6 @@ TEST_F(SwarmkvBasicTest, TypeTokenBucket) while(now.tv_sec - start.tv_sec<3) { request_tokens=random()%(2*rate); - cmd_exec_arg_expect_integer(arg, 0); reply=swarmkv_command(db, "TCONSUME %s %lld FLEXIBLE", key, request_tokens); if(reply->type==SWARMKV_REPLY_INTEGER) { @@ -336,6 +335,23 @@ TEST_F(SwarmkvBasicTest, TypeTokenBucket) } EXPECT_LE(got_tokens, (now.tv_sec -start.tv_sec)*rate+capacity); cmd_exec_arg_free(arg); + + //Infinite tokens + reply=swarmkv_command(db, "TCFG %s 0 0", key); + EXPECT_EQ(reply->type, SWARMKV_REPLY_STATUS); + swarmkv_reply_free(reply); + long long t=0; + for(i=0; i<100; i++) + { + reply=swarmkv_command(db, "TCONSUME %s 10000", key); + t+=reply->integer; + swarmkv_reply_free(reply); + } + EXPECT_EQ(t, 10000*i); + reply=swarmkv_command(db, "TINFO %s", key); + ASSERT_EQ(reply->n_element, 10); + EXPECT_EQ(reply->elements[5]->integer, got_tokens+t); + swarmkv_reply_free(reply); } TEST_F(SwarmkvBasicTest, TypeHash) { |
