diff options
| author | 郑超 <[email protected]> | 2023-12-07 11:21:20 +0000 |
|---|---|---|
| committer | 郑超 <[email protected]> | 2023-12-07 11:21:20 +0000 |
| commit | 7332c66226edb71bfa210428571bb7ad49bcb2e6 (patch) | |
| tree | 4bee689e8e8e0503a96c9a9b665720bde73f885a | |
| parent | 4008e3509a4a500bed26fdcb4b03ce7d05cc77e2 (diff) | |
The staggered HyperLogLog does not slide during queries and merges, leading to inaccurate token assignments in the fair/bulk token bucket. Using obj_replicate instead of obj_new for a straightforward CRDT replication.v4.0.3
| -rw-r--r-- | CRDT/bulk_token_bucket.c | 23 | ||||
| -rw-r--r-- | CRDT/bulk_token_bucket.h | 1 | ||||
| -rw-r--r-- | CRDT/crdt_base_gtest.cpp | 130 | ||||
| -rw-r--r-- | CRDT/fair_token_bucket.c | 34 | ||||
| -rw-r--r-- | CRDT/fair_token_bucket.h | 1 | ||||
| -rw-r--r-- | CRDT/lww_register.c | 10 | ||||
| -rw-r--r-- | CRDT/lww_register.h | 3 | ||||
| -rw-r--r-- | CRDT/oc_token_bucket.c | 24 | ||||
| -rw-r--r-- | CRDT/oc_token_bucket.h | 3 | ||||
| -rw-r--r-- | CRDT/or_map.c | 10 | ||||
| -rw-r--r-- | CRDT/or_map.h | 1 | ||||
| -rw-r--r-- | CRDT/or_set.c | 12 | ||||
| -rw-r--r-- | CRDT/or_set.h | 1 | ||||
| -rw-r--r-- | CRDT/pn_counter.c | 8 | ||||
| -rw-r--r-- | CRDT/pn_counter.h | 1 | ||||
| -rw-r--r-- | CRDT/st_hyperloglog.c | 79 | ||||
| -rw-r--r-- | CRDT/st_hyperloglog.h | 1 | ||||
| -rw-r--r-- | docs/design.md | 3 | ||||
| -rw-r--r-- | src/swarmkv_store.c | 51 |
19 files changed, 292 insertions, 104 deletions
diff --git a/CRDT/bulk_token_bucket.c b/CRDT/bulk_token_bucket.c index b836286..b16e118 100644 --- a/CRDT/bulk_token_bucket.c +++ b/CRDT/bulk_token_bucket.c @@ -19,7 +19,7 @@ struct btb_configuration long long CBS; //Committed Burst Size long long refill_interval_ms; long long bucket_num; - struct timeval timestamp; + struct timeval last_cfg; }; struct refill_mark { @@ -29,7 +29,6 @@ struct refill_mark struct bulk_token_bucket { - uuid_t my_id; struct btb_configuration cfg; struct timeval start; long long perturb; @@ -53,7 +52,8 @@ struct bulk_token_bucket *bulk_token_bucket_new(uuid_t my_id, struct timeval now btb->cfg.CBS=CBS; btb->cfg.bucket_num=bucket_num; btb->cfg.refill_interval_ms=10; - memcpy(&btb->cfg.timestamp, &now, sizeof(btb->cfg.timestamp)); + memcpy(&btb->cfg.last_cfg, &now, sizeof(btb->cfg.last_cfg)); + btb->hll=ST_hyperloglog_new(9, 5, now); memcpy(&btb->start, &now, sizeof(btb->start)); btb->consumed=g_array_new(my_id, bucket_num); @@ -61,7 +61,6 @@ struct bulk_token_bucket *bulk_token_bucket_new(uuid_t my_id, struct timeval now btb->perturb=1; memcpy(&btb->perturb_timestamp, &now, sizeof(btb->perturb_timestamp)); memcpy(&btb->start, &now, sizeof(btb->start)); - uuid_copy(btb->my_id, my_id); return btb; } void bulk_token_bucket_free(struct bulk_token_bucket *btb) @@ -157,7 +156,7 @@ void bulk_token_bucket_configure(struct bulk_token_bucket *btb, struct timeval n btb->cfg.CIR=CIR; btb->cfg.CBS=CBS; btb->cfg.bucket_num=bucket_num; - memcpy(&btb->cfg.timestamp, &now, sizeof(btb->cfg.timestamp)); + memcpy(&btb->cfg.last_cfg, &now, sizeof(btb->cfg.last_cfg)); g_array_resize(btb->consumed, bucket_num); btb->refilled=realloc(btb->refilled, btb->cfg.bucket_num*sizeof(struct refill_mark)); memset(btb->refilled, 0, btb->cfg.bucket_num*sizeof(struct refill_mark)); @@ -169,6 +168,7 @@ void bulk_token_bucket_info(const struct bulk_token_bucket *btb, struct timeval info->CBS=btb->cfg.CBS; info->bucket_number=btb->cfg.bucket_num; info->replicas=g_array_replicas(btb->consumed); + ST_hyperloglog_step(btb->hll, now); info->estimate_keys=ST_hyperloglog_count(btb->hll); info->collision_rate=collision_probability(info->bucket_number, info->estimate_keys); return; @@ -241,9 +241,9 @@ struct bulk_token_bucket *bulk_token_bucket_deserialize(const char *blob, size_t } void bulk_token_bucket_merge(struct bulk_token_bucket *dst, const struct bulk_token_bucket *src) { - if(timercmp(&(dst->cfg.timestamp), &(src->cfg.timestamp), <))//Last-Write-Wins + if(timercmp(&(dst->cfg.last_cfg), &(src->cfg.last_cfg), <))//Last-Write-Wins { - bulk_token_bucket_configure(dst, src->cfg.timestamp, src->cfg.CIR, src->cfg.CBS, src->cfg.bucket_num); + bulk_token_bucket_configure(dst, src->cfg.last_cfg, src->cfg.CIR, src->cfg.CBS, src->cfg.bucket_num); } if(timercmp(&(dst->perturb_timestamp), &(src->perturb_timestamp), <))//Last-Write-Wins { @@ -271,6 +271,15 @@ void bulk_token_bucket_merge_blob(struct bulk_token_bucket *btb, const char * bl bulk_token_bucket_free(src); return; } +struct bulk_token_bucket *bulk_token_bucket_replicate(uuid_t uuid, const char *blob, size_t blob_sz) +{ + struct bulk_token_bucket *btb=bulk_token_bucket_deserialize(blob, blob_sz); + struct g_array *tmp=btb->consumed; + btb->consumed=g_array_new(uuid, btb->cfg.bucket_num); + g_array_merge(btb->consumed, tmp); + g_array_free(tmp); + return btb; +} size_t bulk_token_bucket_mem_size(const struct bulk_token_bucket *btb) { //to do diff --git a/CRDT/bulk_token_bucket.h b/CRDT/bulk_token_bucket.h index cce7324..5300b89 100644 --- a/CRDT/bulk_token_bucket.h +++ b/CRDT/bulk_token_bucket.h @@ -32,6 +32,7 @@ void bulk_token_bucket_serialize(const struct bulk_token_bucket *btb, char **blo struct bulk_token_bucket *bulk_token_bucket_deserialize(const char *blob, size_t blob_sz); void bulk_token_bucket_merge(struct bulk_token_bucket *dst, const struct bulk_token_bucket *src); void bulk_token_bucket_merge_blob(struct bulk_token_bucket *btb, const char * blob, size_t blob_sz); +struct bulk_token_bucket *bulk_token_bucket_replicate(uuid_t uuid, const char *blob, size_t blob_sz); size_t bulk_token_bucket_mem_size(const struct bulk_token_bucket *btb); #ifdef __cplusplus } diff --git a/CRDT/crdt_base_gtest.cpp b/CRDT/crdt_base_gtest.cpp index 24aa476..0b9bf94 100644 --- a/CRDT/crdt_base_gtest.cpp +++ b/CRDT/crdt_base_gtest.cpp @@ -1725,6 +1725,136 @@ TEST(STHyperLogLog, Reconfigure) ST_hyperloglog_free(h[i]); } } +TEST(STHyperLogLog, EventualConsistency) +{ + struct timeval start, step, now; + gettimeofday(&start, NULL); + memcpy(&now, &start, sizeof(now)); + int n_replica=2; + struct ST_hyperloglog *h[n_replica]; + int time_window_s=10; + unsigned char precision=6; + for(int i=0; i<n_replica; i++) + { + h[i]=ST_hyperloglog_new(precision, time_window_s, start); + } + int key=1319823, j=0; + int n_add=0; + int add_per_step=100; + step.tv_sec=0; + step.tv_usec=1000; + int item_per_second=add_per_step*1000*1000/step.tv_usec; + while(now.tv_sec-start.tv_sec<time_window_s*5) + { + timeradd(&now, &step, &now); + for(int i=0; i<add_per_step; i++) + { + j=1+random()%(n_replica-1); + key++; + ST_hyperloglog_add(h[j], (const char *)&key, sizeof(key), now); + n_add++; + } + } + for(int i=0; i<n_replica; i++) + { + for(int j=0; j<n_replica; j++) + { + if(i==j) continue; + ST_hyperloglog_merge(h[i], h[j]); + } + } + double hll_count=0, error=0; + hll_count=ST_hyperloglog_count(h[0]); + error=ST_hyperloglog_error_for_precision(precision); + EXPECT_NEAR(hll_count, item_per_second*time_window_s, error*item_per_second*time_window_s); + + for(int i=0; i<n_replica; i++) + { + ST_hyperloglog_free(h[i]); + } +} +TEST(STHyperLogLog, Merge) +{ + struct timeval start, now; + gettimeofday(&start, NULL); + memcpy(&now, &start, sizeof(now)); + int n_replica=2; + struct ST_hyperloglog *h[n_replica]; + int time_window_s=10; + unsigned char precision=6; + for(int i=0; i<n_replica; i++) + { + h[i]=ST_hyperloglog_new(precision, time_window_s, start); + start.tv_sec+=time_window_s+1; + } + for(int i=0; i<n_replica; i++) + { + for(int j=0; j<n_replica; j++) + { + if(i==j) continue; + ST_hyperloglog_merge(h[i], h[j]); + } + } + for(int i=0; i<n_replica; i++) + { + ST_hyperloglog_free(h[i]); + } +} +TEST(STHyperLogLog, Step) +{ + struct timeval start, step, now; + gettimeofday(&start, NULL); + memcpy(&now, &start, sizeof(now)); + step.tv_sec=0; + step.tv_usec=10000; + int time_window_s=32; + unsigned char precision=8; + struct ST_hyperloglog *h=ST_hyperloglog_new(precision, time_window_s, start); + int key=1; + int add_per_step=1; + int item_per_second=add_per_step*1000*1000/step.tv_usec; + while(now.tv_sec-start.tv_sec<time_window_s*10) + { + timeradd(&now, &step, &now); + for(int i=0; i<add_per_step; i++) + { + key++; + ST_hyperloglog_add(h, (const char *)&key, sizeof(key), now); + } + } + double hll_count=0; + double error=ST_hyperloglog_error_for_precision(precision); + hll_count=ST_hyperloglog_count(h); + printf("time_window: %d, count: %d, error: %.2f \n", time_window_s, item_per_second*time_window_s, error); + for(int i=0; i<time_window_s; i++) + { + ST_hyperloglog_step(h, now); + hll_count=ST_hyperloglog_count(h); + printf("t+%d, estimate: %.2f\n", i, hll_count); + now.tv_sec++; + //EXPECT_NEAR(hll_count, item_per_second*(time_window_s-i), error*item_per_second*(time_window_s-i)); + } + ST_hyperloglog_free(h); + return; +/* + + hll_count=ST_hyperloglog_count(h); + EXPECT_NEAR(hll_count, item_per_second*time_window_s, error*item_per_second*time_window_s); + + now.tv_sec += time_window_s/12; + ST_hyperloglog_step(h, now); + hll_count=ST_hyperloglog_count(h); + EXPECT_NEAR(hll_count, item_per_second*time_window_s/4, error*item_per_second*time_window_s/2); + + + now.tv_sec += time_window_s/12; + ST_hyperloglog_step(h, now); + hll_count=ST_hyperloglog_count(h); + EXPECT_NEAR(hll_count, 0, error*item_per_second*time_window_s); + + ST_hyperloglog_free(h); + */ +} TEST(GArray, Basic) { uuid_t uuid; diff --git a/CRDT/fair_token_bucket.c b/CRDT/fair_token_bucket.c index 26a37ea..629227e 100644 --- a/CRDT/fair_token_bucket.c +++ b/CRDT/fair_token_bucket.c @@ -1,7 +1,6 @@ #include "fair_token_bucket.h" #include "oc_token_bucket.h" #include "st_hyperloglog.h" -//#include "g_array.h" #include "crdt_utils.h" #include "xxhash.h" @@ -17,7 +16,6 @@ struct fair_token_bucket { /* Sync Variables*/ - uuid_t uuid; long long divisor; struct timeval last_cfg; struct ST_hyperloglog *hll[FAIR_TB_WEIGHT_MAX]; //counting active keys @@ -38,16 +36,13 @@ struct fair_token_bucket *fair_token_bucket_new(uuid_t uuid, struct timeval now, { struct fair_token_bucket *ftb=ALLOC(struct fair_token_bucket, 1); - uuid_copy(ftb->uuid, uuid); ftb->divisor=divisor; for(int i=0; i<FAIR_TB_WEIGHT_MAX; i++) { ftb->hll[i]=ST_hyperloglog_new(9, 5, now); } - //ftb->sfq=g_array_new(uuid, ftb->divisor); ftb->sfq=ALLOC(long long, divisor); ftb->bucket=OC_token_bucket_new(uuid, now, CIR, CBS); - //OC_token_bucket_set_no_reserve(ftb->bucket); ftb->per_weight_quantum=CIR; memcpy(&ftb->last_cfg, &now, sizeof(ftb->last_cfg)); return ftb; @@ -59,7 +54,6 @@ void fair_token_bucket_free(struct fair_token_bucket *ftb) { ST_hyperloglog_free(ftb->hll[i]); } - //g_array_free(ftb->sfq); free(ftb->sfq); OC_token_bucket_free(ftb->bucket); free(ftb); @@ -71,6 +65,7 @@ static void ftb_quantum_estimation(struct fair_token_bucket *ftb, struct timeval long long total_weight=0, n_active_key=0, count=0; for(int i=0; i<FAIR_TB_WEIGHT_MAX; i++) { + ST_hyperloglog_step(ftb->hll[i], now); count = ST_hyperloglog_count(ftb->hll[i]); total_weight += (i+1)*count; n_active_key += count; @@ -93,7 +88,6 @@ static void ftb_quantum_estimation(struct fair_token_bucket *ftb, struct timeval static void ftb_perturb(struct fair_token_bucket *ftb, struct timeval now) { ftb_quantum_estimation(ftb, now); - //g_array_reset(ftb->sfq); memset(ftb->sfq, 0, sizeof(long long)*ftb->divisor); ftb->perturb_seed++; } @@ -115,7 +109,6 @@ long long fair_token_bucket_consume(struct fair_token_bucket *ftb, struct timeva sfq_idx=XXH3_64bits_withSeed(key, keylen, ftb->perturb_seed)%ftb->divisor; long long deficit_est=0; - //deficit_est=g_array_get(ftb->sfq, sfq_idx); deficit_est=ftb->sfq[sfq_idx]; deficit_est/=MAX(1, ftb->n_active_key/ftb->divisor); @@ -128,7 +121,6 @@ long long fair_token_bucket_consume(struct fair_token_bucket *ftb, struct timeva long long allocated_tokens=OC_token_bucket_consume(ftb->bucket, now, cmd, tokens); if(allocated_tokens) { - //g_array_incrby(ftb->sfq, sfq_idx, allocated_tokens); ftb->sfq[sfq_idx]+=allocated_tokens; } ftb->debug_last_allocated=allocated_tokens; @@ -138,7 +130,6 @@ void fair_token_bucket_configure(struct fair_token_bucket *ftb, struct timeval n { ftb->divisor=divisor; ftb->sfq=(long long*)realloc(ftb->sfq, sizeof(long long)*ftb->divisor); - //g_array_resize(ftb->sfq, ftb->divisor); memcpy(&ftb->last_cfg, &now, sizeof(ftb->last_cfg)); OC_token_bucket_configure(ftb->bucket, now, CIR, CBS); ftb_perturb(ftb, now); @@ -148,6 +139,7 @@ void fair_token_bucket_info(const struct fair_token_bucket *ftb, struct timeval { for(int i=0; i<FAIR_TB_WEIGHT_MAX; i++) { + ST_hyperloglog_step(ftb->hll[i], now); info->active_key_number += ST_hyperloglog_count(ftb->hll[i]); } info->divisor = ftb->divisor; @@ -169,7 +161,6 @@ size_t fair_token_bucket_serialized_size(const struct fair_token_bucket *ftb) { sz += ST_hyperloglog_serialized_size(ftb->hll[i]); } -// sz += g_array_serialized_size(ftb->sfq); sz += OC_token_bucket_serialized_size(ftb->bucket); return sz; } @@ -194,13 +185,6 @@ void fair_token_bucket_serialize(const struct fair_token_bucket *ftb, char **blo offset += ST_hyperloglog_serialized_size(ftb->hll[i]); free(tmp_buff); } - /* - g_array_serialize(ftb->sfq, &tmp_buff, &tmp_sz); - memcpy(buffer+offset, tmp_buff, tmp_sz); - assert(tmp_sz==g_array_serialized_size(ftb->sfq)); - offset += g_array_serialized_size(ftb->sfq); - free(tmp_buff); - */ OC_token_bucket_serialize(ftb->bucket, &tmp_buff, &tmp_sz); memcpy(buffer+offset, tmp_buff, tmp_sz); offset += OC_token_bucket_serialized_size(ftb->bucket); @@ -226,10 +210,6 @@ struct fair_token_bucket *fair_token_bucket_deserialize(const char *blob, size_t ftb->hll[i]=ST_hyperloglog_deserialize(blob+offset, blob_sz-offset); offset += ST_hyperloglog_serialized_size(ftb->hll[i]); } - /* - ftb->sfq=g_array_deserialize(blob+offset, blob_sz-offset); - offset += g_array_serialized_size(ftb->sfq); - */ ftb->bucket=OC_token_bucket_deserialize(blob+offset, blob_sz-offset); offset += OC_token_bucket_serialized_size(ftb->bucket); assert(offset==hdr.payload_sz); @@ -247,7 +227,6 @@ void fair_token_bucket_merge(struct fair_token_bucket *dst, const struct fair_to dst->sfq=(long long*)realloc(dst->sfq, sizeof(long long)*dst->divisor); memcpy(&dst->last_cfg, &src->last_cfg, sizeof(dst->last_cfg)); } - //g_array_merge(dst->sfq, src->sfq); OC_token_bucket_merge(dst->bucket, src->bucket); return; } @@ -258,6 +237,14 @@ void fair_token_bucket_merge_blob(struct fair_token_bucket *ftb, const char *blo fair_token_bucket_free(src); return; } +struct fair_token_bucket *fair_token_bucket_replicate(uuid_t uuid, const char *blob, size_t blob_sz) +{ + struct fair_token_bucket *ftb=fair_token_bucket_deserialize(blob, blob_sz); + OC_token_bucket_change_uuid(ftb->bucket, uuid); + ftb->sfq=ALLOC(long long, ftb->divisor); + return ftb; + +} size_t fair_token_bucket_mem_size(const struct fair_token_bucket *ftb) { size_t sz=0; @@ -266,7 +253,6 @@ size_t fair_token_bucket_mem_size(const struct fair_token_bucket *ftb) { sz += ST_hyperloglog_mem_size(ftb->hll[i]); } - //sz+=g_array_mem_size(ftb->sfq); sz+=sizeof(long long)*ftb->divisor; sz+=OC_token_bucket_mem_size(ftb->bucket); return sz; diff --git a/CRDT/fair_token_bucket.h b/CRDT/fair_token_bucket.h index 06bc17c..9b57fac 100644 --- a/CRDT/fair_token_bucket.h +++ b/CRDT/fair_token_bucket.h @@ -37,6 +37,7 @@ void fair_token_bucket_serialize(const struct fair_token_bucket *ftb, char **blo struct fair_token_bucket *fair_token_bucket_deserialize(const char *blob, size_t blob_sz); void fair_token_bucket_merge(struct fair_token_bucket *dst, const struct fair_token_bucket *src); void fair_token_bucket_merge_blob(struct fair_token_bucket *ftb, const char *blob, size_t blob_sz); +struct fair_token_bucket *fair_token_bucket_replicate(uuid_t uuid, const char *blob, size_t blob_sz); size_t fair_token_bucket_mem_size(const struct fair_token_bucket *ftb); #ifdef __cplusplus } diff --git a/CRDT/lww_register.c b/CRDT/lww_register.c index 7781735..5dbedb1 100644 --- a/CRDT/lww_register.c +++ b/CRDT/lww_register.c @@ -81,7 +81,15 @@ void LWW_register_merge_blob(struct LWW_register *reg, const char *blob, size_t LWW_register_free(to_merge); return; } -size_t LWW_regeister_mem_size(const struct LWW_register *reg) +struct LWW_register *LWW_register_replicate(uuid_t uuid, const char *blob, size_t blob_sz) +{ + struct LWW_register *src=LWW_register_deserialize(blob, blob_sz); + struct LWW_register *dst=LWW_register_new(uuid); + LWW_register_merge(dst, src); + LWW_register_free(src); + return dst; +} +size_t LWW_register_mem_size(const struct LWW_register *reg) { return (sizeof(struct LWW_register)+reg->size); } diff --git a/CRDT/lww_register.h b/CRDT/lww_register.h index 2217211..18865a0 100644 --- a/CRDT/lww_register.h +++ b/CRDT/lww_register.h @@ -27,7 +27,8 @@ struct LWW_register *LWW_register_deserialize(const char *blob, size_t blob_sz); void LWW_register_merge(struct LWW_register *dst, struct LWW_register *src); void LWW_register_merge_blob(struct LWW_register *reg, const char *blob, size_t blob_sz); -size_t LWW_regeister_mem_size(const struct LWW_register *reg); +struct LWW_register *LWW_register_replicate(uuid_t uuid, const char *blob, size_t blob_sz); +size_t LWW_register_mem_size(const struct LWW_register *reg); #ifdef __cplusplus } diff --git a/CRDT/oc_token_bucket.c b/CRDT/oc_token_bucket.c index be0b092..429dd33 100644 --- a/CRDT/oc_token_bucket.c +++ b/CRDT/oc_token_bucket.c @@ -12,7 +12,7 @@ struct OC_configuration long long CIR; //Committed Information Rate long long CBS; //Committed Burst Size long long refill_interval_ms; - struct timeval write_timestamp; + struct timeval last_cfg; }; #define timeval_to_ms(t) ((t).tv_sec*1000+(t).tv_usec/1000) struct OC_token_bucket @@ -31,8 +31,7 @@ struct OC_token_bucket *OC_token_bucket_new(uuid_t my_id, struct timeval now, lo { struct OC_token_bucket *bucket=ALLOC(struct OC_token_bucket, 1); bucket->consumed=PN_counter_new(my_id); - gettimeofday(&bucket->cfg.write_timestamp, NULL); - memcpy(&bucket->cfg.write_timestamp, &now, sizeof(bucket->cfg.write_timestamp)); + //memcpy(&bucket->cfg.last_cfg, &now, sizeof(bucket->cfg.last_cfg)); bucket->cfg.CIR=CIR; bucket->cfg.CBS=CBS; bucket->cfg.refill_interval_ms=OCTB_REFILL_INTERVAL_MS_DEFAULT; @@ -50,7 +49,7 @@ void OC_token_bucket_free(struct OC_token_bucket *bucket) } void OC_token_bucket_configure(struct OC_token_bucket *bucket, struct timeval now, long long CIR, long long CBS) { - memcpy(&bucket->cfg.write_timestamp, &now, sizeof(bucket->cfg.write_timestamp)); + memcpy(&bucket->cfg.last_cfg, &now, sizeof(bucket->cfg.last_cfg)); if(CIR>=0) bucket->cfg.CIR=CIR; if(CBS>=0) bucket->cfg.CBS=CBS; } @@ -156,7 +155,7 @@ struct OC_token_bucket *OC_token_bucket_deserialize(const char *blob, size_t blo } void OC_token_bucket_merge(struct OC_token_bucket *dst, const struct OC_token_bucket *src) { - if(timercmp(&(dst->cfg.write_timestamp), &(src->cfg.write_timestamp), <))//Last-Write-Wins + if(timercmp(&(dst->cfg.last_cfg), &(src->cfg.last_cfg), <))//Last-Write-Wins { memcpy(&dst->cfg, &src->cfg, sizeof(src->cfg)); } @@ -178,6 +177,21 @@ void OC_token_bucket_merge_blob(struct OC_token_bucket *bucket, const char *blob OC_token_bucket_free(to_merge); return; } + +void OC_token_bucket_change_uuid(struct OC_token_bucket *bucket, uuid_t new_uuid) +{ + struct PN_counter *tmp=bucket->consumed; + bucket->consumed=PN_counter_new(new_uuid); + PN_counter_merge(bucket->consumed, tmp); + PN_counter_free(tmp); + return; +} +struct OC_token_bucket *OC_token_bucket_replicate(uuid_t uuid, const char *blob, size_t blob_sz) +{ + struct OC_token_bucket *bucket=OC_token_bucket_deserialize(blob, blob_sz); + OC_token_bucket_change_uuid(bucket, uuid); + return bucket; +} size_t OC_token_bucket_mem_size(const struct OC_token_bucket *bucket) { size_t sz=0; diff --git a/CRDT/oc_token_bucket.h b/CRDT/oc_token_bucket.h index 26a0a32..8830fcd 100644 --- a/CRDT/oc_token_bucket.h +++ b/CRDT/oc_token_bucket.h @@ -32,7 +32,7 @@ struct OC_token_bucket_info void OC_token_bucket_info(struct OC_token_bucket *bucket, struct timeval now, struct OC_token_bucket_info *info); void OC_token_bucket_free(struct OC_token_bucket *bucket); void OC_token_bucket_set_no_reserve(struct OC_token_bucket *bucket); - +void OC_token_bucket_change_uuid(struct OC_token_bucket *bucket, uuid_t new_uuid); long long OC_token_bucket_consume(struct OC_token_bucket *bucket, struct timeval now, enum tb_consume_type cmd, long long tokens); void OC_token_bucket_serialize(const struct OC_token_bucket *bucket, char **blob, size_t *blob_sz); @@ -40,6 +40,7 @@ struct OC_token_bucket *OC_token_bucket_deserialize(const char *blob, size_t blo void OC_token_bucket_merge(struct OC_token_bucket *dst, const struct OC_token_bucket *src); void OC_token_bucket_merge_blob(struct OC_token_bucket *bucket, const char *blob, size_t blob_sz); +struct OC_token_bucket *OC_token_bucket_replicate(uuid_t uuid, const char *blob, size_t blob_sz); size_t OC_token_bucket_serialized_size(const struct OC_token_bucket *bucket); size_t OC_token_bucket_mem_size(const struct OC_token_bucket *bucket); #ifdef __cplusplus diff --git a/CRDT/or_map.c b/CRDT/or_map.c index 5315994..bc4cd6c 100644 --- a/CRDT/or_map.c +++ b/CRDT/or_map.c @@ -157,7 +157,7 @@ size_t OR_record_size(const struct OR_record *record) sz+=PN_counter_serialized_size(record->counter); break; case TYPE_STRING: - sz+=LWW_regeister_mem_size(record->string); + sz+=LWW_register_mem_size(record->string); break; default: break; @@ -747,6 +747,14 @@ void OR_map_merge_blob(struct OR_map *map, const char *blob, size_t blob_sz) OR_map_free(to_merge); return; } +struct OR_map *OR_map_replicate(uuid_t uuid, const char *blob, size_t blob_sz) +{ + struct OR_map *src=OR_map_deserialize(blob, blob_sz); + struct OR_map *dst=OR_map_new(uuid); + OR_map_merge(dst, src); + OR_map_free(src); + return dst; +} size_t OR_map_mem_size(const struct OR_map *map) { size_t sz=0; diff --git a/CRDT/or_map.h b/CRDT/or_map.h index 771448b..4a69e92 100644 --- a/CRDT/or_map.h +++ b/CRDT/or_map.h @@ -49,6 +49,7 @@ void OR_map_serialize(const struct OR_map *map, char **blob, size_t *blob_sz); struct OR_map *OR_map_deserialize(const char *blob, size_t blob_sz); void OR_map_merge(struct OR_map *dst, struct OR_map *src); void OR_map_merge_blob(struct OR_map *map, const char *blob, size_t blob_sz); +struct OR_map *OR_map_replicate(uuid_t uuid, const char *blob, size_t blob_sz); size_t OR_map_mem_size(const struct OR_map *map); #ifdef __cplusplus diff --git a/CRDT/or_set.c b/CRDT/or_set.c index d1ea936..0dc9054 100644 --- a/CRDT/or_set.c +++ b/CRDT/or_set.c @@ -93,11 +93,23 @@ void OR_set_serialize(const struct OR_set *set, char **blob, size_t *blob_sz) OR_map_serialize(set->map, blob, blob_sz); return; } +struct OR_set *OR_set_deserialize(const char *blob, size_t blob_sz) +{ + struct OR_set *set=ALLOC(struct OR_set, 1); + set->map=OR_map_deserialize(blob, blob_sz); + return set; +} void OR_set_merge_blob(struct OR_set *set, const char *blob, size_t blob_sz) { OR_map_merge_blob(set->map, blob, blob_sz); return; } +struct OR_set *OR_set_replicate(uuid_t uuid, const char *blob, size_t blob_sz) +{ + struct OR_set *dst=OR_set_new(uuid); + OR_set_merge_blob(dst, blob, blob_sz); + return dst; +} size_t OR_set_mem_size(const struct OR_set *set) { return sizeof(struct OR_set)+OR_map_mem_size(set->map); diff --git a/CRDT/or_set.h b/CRDT/or_set.h index 094683f..0293405 100644 --- a/CRDT/or_set.h +++ b/CRDT/or_set.h @@ -37,6 +37,7 @@ struct OR_set_member_list* OR_set_members(const struct OR_set *set); void OR_set_serialize(const struct OR_set *set, char **blob, size_t *blob_sz); void OR_set_merge_blob(struct OR_set *set, const char *blob, size_t blob_sz); +struct OR_set *OR_set_replicate(uuid_t uuid, const char *blob, size_t blob_sz); size_t OR_set_mem_size(const struct OR_set *set); #ifdef __cplusplus diff --git a/CRDT/pn_counter.c b/CRDT/pn_counter.c index d1431dd..1c69589 100644 --- a/CRDT/pn_counter.c +++ b/CRDT/pn_counter.c @@ -172,6 +172,14 @@ void PN_counter_merge_blob(struct PN_counter *pnc, const char *blob, size_t blob PN_counter_free(to_merge); return; } +struct PN_counter *PN_counter_replicate(uuid_t uuid, const char *blob, size_t blob_sz) +{ + struct PN_counter *src=PN_counter_deserialize(blob, blob_sz); + struct PN_counter *pnc=PN_counter_new(uuid); + PN_counter_merge(pnc, src); + PN_counter_free(src); + return pnc; +} size_t PN_counter_replica_num(const struct PN_counter *pnc) { return MAX(HASH_COUNT(pnc->hash_item), 1); diff --git a/CRDT/pn_counter.h b/CRDT/pn_counter.h index d56cf84..484832d 100644 --- a/CRDT/pn_counter.h +++ b/CRDT/pn_counter.h @@ -26,6 +26,7 @@ long long PN_counter_incrby(struct PN_counter *pnc, long long increment); void PN_counter_merge_blob(struct PN_counter *pnc, const char *blob, size_t blob_sz); void PN_counter_merge(struct PN_counter *dst, const struct PN_counter *src); +struct PN_counter *PN_counter_replicate(uuid_t uuid, const char *blob, size_t blob_sz); void PN_counter_serialize(const struct PN_counter *pnc, char **blob, size_t *blob_sz); struct PN_counter *PN_counter_deserialize(const char *blob, size_t blob_sz); size_t PN_counter_serialized_size(const struct PN_counter *pnc); diff --git a/CRDT/st_hyperloglog.c b/CRDT/st_hyperloglog.c index 42cb162..897c32a 100644 --- a/CRDT/st_hyperloglog.c +++ b/CRDT/st_hyperloglog.c @@ -19,7 +19,7 @@ #define NUM_REG(precision) ((1 << precision)) #define INT_CEIL(num, denom) (((num) + (denom) - 1) / (denom)) - +#define PRECISION_TO_WORD(precision) INT_CEIL(NUM_REG(precision), REG_PER_WORD) struct ST_HLL_configuration { @@ -48,15 +48,12 @@ struct ST_hyperloglog *ST_hyperloglog_new(unsigned char precision, int time_wind // Store precision h->cfg.precision = precision; h->cfg.time_window_s=time_window_seconds; - memcpy(&h->cfg.timestamp, &now, sizeof(h->cfg.timestamp)); + //memcpy(&h->cfg.timestamp, &now, sizeof(h->cfg.timestamp)); memcpy(&h->reset_time, &now, sizeof(h->reset_time)); - // Determine how many registers are needed - int num_reg = NUM_REG(precision); - // Get the full words required - int words = INT_CEIL(num_reg, REG_PER_WORD); + int words = PRECISION_TO_WORD(precision); // Allocate and zero out the registers h->registers = ALLOC(uint32_t, words); @@ -68,15 +65,11 @@ void ST_hyperloglog_configure(struct ST_hyperloglog *h, unsigned char precision, if(h->cfg.precision != precision) { free(h->registers); - // Determine how many registers are needed - int reg = NUM_REG(precision); - - // Get the full words required - int words = INT_CEIL(reg, REG_PER_WORD); + h->cfg.precision=precision; // Allocate and zero out the registers - h->registers = ALLOC(uint32_t, words); - h->cfg.precision=precision; + h->registers = ALLOC(uint32_t, PRECISION_TO_WORD(h->cfg.precision)); + } memcpy(&h->cfg.timestamp, &now, sizeof(h->cfg.timestamp)); return; @@ -139,10 +132,11 @@ static void periodic_reset(struct ST_hyperloglog *h, const struct timeval now) int num_reg=NUM_REG(h->cfg.precision); int reset_time_slot_us=h->cfg.time_window_s*2*1000*1000/num_reg; long long delta_us=timeval_delta_us(h->reset_time, now); + struct timeval step; - //reset_time_slot_us+=1000; step.tv_sec=reset_time_slot_us/1000/1000; step.tv_usec=reset_time_slot_us%(1000*1000); + if(delta_us>reset_time_slot_us) { for(int i=0; i<delta_us/reset_time_slot_us; i++) @@ -156,34 +150,65 @@ static void periodic_reset(struct ST_hyperloglog *h, const struct timeval now) int ST_hyperloglog_add(struct ST_hyperloglog *h, const char *key, size_t keylen, const struct timeval now) { periodic_reset(h, now); - // Compute the hash value of the key - uint64_t hash=0; + uint64_t hash=0; hash=XXH3_64bits_withSeed(key, keylen, 171); - // Add the hashed value - return hll_add_hash(h, hash); + return hll_add_hash(h, hash); +} +static struct ST_hyperloglog *ST_hyperloglog_copy(const struct ST_hyperloglog *src) +{ + struct ST_hyperloglog *dst=ST_hyperloglog_new(src->cfg.precision, src->cfg.time_window_s, src->reset_time); + size_t num_reg = NUM_REG(src->cfg.precision); + size_t words = INT_CEIL(num_reg, REG_PER_WORD); + + memcpy(dst->registers, src->registers, words*sizeof(int32_t)); + return dst; +} +void ST_hyperloglog_step(struct ST_hyperloglog *h, const struct timeval now) +{ + periodic_reset(h, now); + return; } void ST_hyperloglog_merge(struct ST_hyperloglog *dst, const struct ST_hyperloglog *src) { + if(timercmp(&(dst->cfg.timestamp), &(src->cfg.timestamp), <))//Last-Write-Wins { ST_hyperloglog_configure(dst, src->cfg.precision, src->cfg.time_window_s, src->cfg.timestamp); } if(dst->cfg.precision != src->cfg.precision) return; - int n_register=NUM_REG(dst->cfg.precision); - int s_reg=0, d_reg=0; - for(int i=0; i<n_register; i++) + if(dst->reset_time.tv_sec - src->reset_time.tv_sec > dst->cfg.time_window_s)//src is too old, no need to merge { - s_reg=get_register(src, i); - d_reg=get_register(dst, i); - set_register(dst, i, MAX(s_reg, d_reg)); + return; } - if(timercmp(&(dst->reset_time), &(src->reset_time), <))//Last-Write-Wins + else if(src->reset_time.tv_sec - dst->reset_time.tv_sec > dst->cfg.time_window_s) //dst is too old, just copy the src register { dst->reset_idx=src->reset_idx; memcpy(&dst->reset_time, &src->reset_time, sizeof(src->reset_time)); + memcpy(dst->registers, src->registers, PRECISION_TO_WORD(dst->cfg.precision)*sizeof(int32_t)); + } + else + { + struct ST_hyperloglog *tmp=ST_hyperloglog_copy(src);//create a copy of src for periodic reset + if(timercmp(&(dst->reset_time), &(tmp->reset_time), <))//correct with latest timestamp + { + periodic_reset(dst, tmp->reset_time); + } + else + { + periodic_reset(tmp, dst->reset_time); + } + int n_register=NUM_REG(dst->cfg.precision); + int s_reg=0, d_reg=0; + for(int i=0; i<n_register; i++) + { + + s_reg=get_register(tmp, i); + d_reg=get_register(dst, i); + set_register(dst, i, MAX(s_reg, d_reg)); + } + ST_hyperloglog_free(tmp); } - return; } size_t ST_hyperloglog_serialized_size(const struct ST_hyperloglog *h) @@ -235,7 +260,7 @@ void ST_hyperloglog_merge_blob(struct ST_hyperloglog *dst, const char *blob, siz ST_hyperloglog_free(src); return; } -double g_switchThreshold[15] = {10, 20, 40, 80, 220, 400, 900, 1800, 3100, 6500, +static double g_switchThreshold[15] = {10, 20, 40, 80, 220, 400, 900, 1800, 3100, 6500, 11500, 20000, 50000, 120000, 350000}; static double *g_rawEstimateData[] = { diff --git a/CRDT/st_hyperloglog.h b/CRDT/st_hyperloglog.h index 9da5c47..cb43c75 100644 --- a/CRDT/st_hyperloglog.h +++ b/CRDT/st_hyperloglog.h @@ -25,6 +25,7 @@ struct ST_hyperloglog *ST_hyperloglog_new(unsigned char precision, int time_wind void ST_hyperloglog_free(struct ST_hyperloglog *h); //Return 1 if at least 1 ST HyperLogLog internal register was altered. 0 otherwise. int ST_hyperloglog_add(struct ST_hyperloglog *h, const char *key, size_t keylen, const struct timeval now); +void ST_hyperloglog_step(struct ST_hyperloglog *h, const struct timeval now); double ST_hyperloglog_count(const struct ST_hyperloglog *h); size_t ST_hyperloglog_serialized_size(const struct ST_hyperloglog *h); void ST_hyperloglog_serialize(const struct ST_hyperloglog *h, char **blob, size_t *blob_sz); diff --git a/docs/design.md b/docs/design.md index 611ae1f..0bf5d0b 100644 --- a/docs/design.md +++ b/docs/design.md @@ -1,7 +1,8 @@ # Design -I list design choices here. +> Don't communicate by sharing memory; share memory by communicating. (Rob Pike) +I list design choices here. Following are terminologies used in Swarm KV. diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c index 987cfd4..79be1ab 100644 --- a/src/swarmkv_store.c +++ b/src/swarmkv_store.c @@ -21,36 +21,12 @@ struct swarmkv_obj_specs { enum sobj_type type; const char *type_name; - void *(*obj_new) (uuid_t uuid); void (*obj_free) (void *obj); void (*obj_serialize) (const void *obj, char **blob, size_t *blob_sz); void (*obj_merge_blob) (void *obj, const char *blob, size_t blob_sz); + void *(*obj_replicate) (uuid_t uuid, const char *blob, size_t blob_sz); size_t (*obj_size)(const void *obj); }; -static void *__wrap_OC_token_bucket_new(uuid_t uuid) -{ - struct OC_token_bucket *bucket=NULL; - struct timeval beginning_of_history; - memset(&beginning_of_history, 0, sizeof(beginning_of_history)); - bucket=OC_token_bucket_new(uuid, beginning_of_history, 0, 0); - return bucket; -} -static void *__wrap_fair_token_bucket_new(uuid_t uuid) -{ - struct fair_token_bucket *bucket=NULL; - struct timeval beginning_of_history; - memset(&beginning_of_history, 0, sizeof(beginning_of_history)); - bucket=fair_token_bucket_new(uuid, beginning_of_history, 0, 0, 0); - return bucket; -} -static void *__wrap_bulk_token_bucket_new(uuid_t uuid) -{ - struct bulk_token_bucket *bucket=NULL; - struct timeval beginning_of_history; - memset(&beginning_of_history, 0, sizeof(beginning_of_history)); - bucket=bulk_token_bucket_new(uuid, beginning_of_history, 0, 0, 0); - return bucket; -} size_t undefined_obj_mem_size(void *obj) { return 0; @@ -65,73 +41,73 @@ struct swarmkv_obj_specs sobj_specs[__SWARMKV_OBJ_TYPE_MAX] = { .type=OBJ_TYPE_STRING, .type_name="string", - .obj_new=(void * (*)(unsigned char *))LWW_register_new, .obj_free=(void (*)(void *))LWW_register_free, .obj_serialize=(void (*)(const void *, char **, size_t *))LWW_register_serialize, .obj_merge_blob=(void (*)(void *, const char *, size_t))LWW_register_merge_blob, - .obj_size=(size_t (*)(const void *))LWW_regeister_mem_size + .obj_replicate=(void * (*)(uuid_t, const char *, size_t))LWW_register_replicate, + .obj_size=(size_t (*)(const void *))LWW_register_mem_size }, { .type=OBJ_TYPE_INTEGER, .type_name="integer", - .obj_new=(void * (*)(unsigned char *))PN_counter_new, .obj_free=(void (*)(void *))PN_counter_free, .obj_serialize=(void (*)(const void *, char **, size_t *))PN_counter_serialize, .obj_merge_blob=(void (*)(void *, const char *, size_t))PN_counter_merge_blob, + .obj_replicate=(void * (*)(uuid_t, const char *, size_t))PN_counter_replicate, .obj_size=(size_t (*)(const void *))PN_counter_mem_size }, { .type=OBJ_TYPE_SET, .type_name="set", - .obj_new=(void * (*)(unsigned char *))OR_set_new, .obj_free=(void (*)(void *))OR_set_free, .obj_serialize=(void (*)(const void *, char **, size_t *))OR_set_serialize, .obj_merge_blob=(void (*)(void *, const char *, size_t))OR_set_merge_blob, + .obj_replicate=(void * (*)(uuid_t, const char *, size_t))OR_set_replicate, .obj_size=(size_t (*)(const void *))OR_set_mem_size }, { .type=OBJ_TYPE_HASH, .type_name="hash", - .obj_new=(void * (*)(unsigned char *))OR_map_new, .obj_free=(void (*)(void *))OR_map_free, .obj_serialize=(void (*)(const void *, char **, size_t *))OR_map_serialize, .obj_merge_blob=(void (*)(void *, const char *, size_t))OR_map_merge_blob, + .obj_replicate=(void * (*)(uuid_t, const char *, size_t))OR_map_replicate, .obj_size=(size_t (*)(const void *))OR_map_mem_size }, { .type=OBJ_TYPE_TOKEN_BUCKET, .type_name="token-bucket", - .obj_new=__wrap_OC_token_bucket_new, .obj_free=(void (*)(void *))OC_token_bucket_free, .obj_serialize=(void (*)(const void *, char **, size_t *))OC_token_bucket_serialize, .obj_merge_blob=(void (*)(void *, const char *, size_t))OC_token_bucket_merge_blob, + .obj_replicate=(void * (*)(uuid_t, const char *, size_t))OC_token_bucket_replicate, .obj_size=(size_t (*)(const void *))OC_token_bucket_mem_size, }, { .type=OBJ_TYPE_FAIR_TOKEN_BUCKET, .type_name="fair-token-bucket", - .obj_new=__wrap_fair_token_bucket_new, .obj_free=(void (*)(void *))fair_token_bucket_free, .obj_serialize=(void (*)(const void *, char **, size_t *))fair_token_bucket_serialize, .obj_merge_blob=(void (*)(void *, const char *, size_t))fair_token_bucket_merge_blob, + .obj_replicate=(void * (*)(uuid_t, const char *, size_t))fair_token_bucket_replicate, .obj_size=(size_t (*)(const void *))fair_token_bucket_mem_size, }, { .type=OBJ_TYPE_BULK_TOKEN_BUCKET, .type_name="bulk-token-bucket", - .obj_new=__wrap_bulk_token_bucket_new, .obj_free=(void (*)(void *))bulk_token_bucket_free, .obj_serialize=(void (*)(const void *, char **, size_t *))bulk_token_bucket_serialize, .obj_merge_blob=(void (*)(void *, const char *, size_t))bulk_token_bucket_merge_blob, + .obj_replicate=(void * (*)(uuid_t, const char *, size_t))bulk_token_bucket_replicate, .obj_size=(size_t (*)(const void *))bulk_token_bucket_mem_size }, { .type=OBJ_TYPE_UNDEFINED, .type_name="undefined", - .obj_new=NULL, .obj_free=undefined_obj_free, .obj_serialize=NULL, .obj_merge_blob=NULL, + .obj_replicate=NULL, .obj_size=(size_t (*)(const void *))undefined_obj_mem_size } }; @@ -440,9 +416,12 @@ void sobj_merge_blob(struct sobj *obj, const char *blob, size_t blob_sz, uuid_t const char *value_blob=blob+offset; if(!obj->raw) { - obj->raw=sobj_specs[obj->type].obj_new(uuid); + obj->raw=sobj_specs[obj->type].obj_replicate(uuid, value_blob, value_blob_sz); + } + else + { + sobj_specs[obj->type].obj_merge_blob(obj->raw, value_blob, value_blob_sz); } - sobj_specs[obj->type].obj_merge_blob(obj->raw, value_blob, value_blob_sz); return; } |
