summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author郑超 <[email protected]>2023-12-07 11:21:20 +0000
committer郑超 <[email protected]>2023-12-07 11:21:20 +0000
commit7332c66226edb71bfa210428571bb7ad49bcb2e6 (patch)
tree4bee689e8e8e0503a96c9a9b665720bde73f885a
parent4008e3509a4a500bed26fdcb4b03ce7d05cc77e2 (diff)
The staggered HyperLogLog does not slide during queries and merges, leading to inaccurate token assignments in the fair/bulk token bucket. Using obj_replicate instead of obj_new for a straightforward CRDT replication.v4.0.3
-rw-r--r--CRDT/bulk_token_bucket.c23
-rw-r--r--CRDT/bulk_token_bucket.h1
-rw-r--r--CRDT/crdt_base_gtest.cpp130
-rw-r--r--CRDT/fair_token_bucket.c34
-rw-r--r--CRDT/fair_token_bucket.h1
-rw-r--r--CRDT/lww_register.c10
-rw-r--r--CRDT/lww_register.h3
-rw-r--r--CRDT/oc_token_bucket.c24
-rw-r--r--CRDT/oc_token_bucket.h3
-rw-r--r--CRDT/or_map.c10
-rw-r--r--CRDT/or_map.h1
-rw-r--r--CRDT/or_set.c12
-rw-r--r--CRDT/or_set.h1
-rw-r--r--CRDT/pn_counter.c8
-rw-r--r--CRDT/pn_counter.h1
-rw-r--r--CRDT/st_hyperloglog.c79
-rw-r--r--CRDT/st_hyperloglog.h1
-rw-r--r--docs/design.md3
-rw-r--r--src/swarmkv_store.c51
19 files changed, 292 insertions, 104 deletions
diff --git a/CRDT/bulk_token_bucket.c b/CRDT/bulk_token_bucket.c
index b836286..b16e118 100644
--- a/CRDT/bulk_token_bucket.c
+++ b/CRDT/bulk_token_bucket.c
@@ -19,7 +19,7 @@ struct btb_configuration
long long CBS; //Committed Burst Size
long long refill_interval_ms;
long long bucket_num;
- struct timeval timestamp;
+ struct timeval last_cfg;
};
struct refill_mark
{
@@ -29,7 +29,6 @@ struct refill_mark
struct bulk_token_bucket
{
- uuid_t my_id;
struct btb_configuration cfg;
struct timeval start;
long long perturb;
@@ -53,7 +52,8 @@ struct bulk_token_bucket *bulk_token_bucket_new(uuid_t my_id, struct timeval now
btb->cfg.CBS=CBS;
btb->cfg.bucket_num=bucket_num;
btb->cfg.refill_interval_ms=10;
- memcpy(&btb->cfg.timestamp, &now, sizeof(btb->cfg.timestamp));
+ memcpy(&btb->cfg.last_cfg, &now, sizeof(btb->cfg.last_cfg));
+
btb->hll=ST_hyperloglog_new(9, 5, now);
memcpy(&btb->start, &now, sizeof(btb->start));
btb->consumed=g_array_new(my_id, bucket_num);
@@ -61,7 +61,6 @@ struct bulk_token_bucket *bulk_token_bucket_new(uuid_t my_id, struct timeval now
btb->perturb=1;
memcpy(&btb->perturb_timestamp, &now, sizeof(btb->perturb_timestamp));
memcpy(&btb->start, &now, sizeof(btb->start));
- uuid_copy(btb->my_id, my_id);
return btb;
}
void bulk_token_bucket_free(struct bulk_token_bucket *btb)
@@ -157,7 +156,7 @@ void bulk_token_bucket_configure(struct bulk_token_bucket *btb, struct timeval n
btb->cfg.CIR=CIR;
btb->cfg.CBS=CBS;
btb->cfg.bucket_num=bucket_num;
- memcpy(&btb->cfg.timestamp, &now, sizeof(btb->cfg.timestamp));
+ memcpy(&btb->cfg.last_cfg, &now, sizeof(btb->cfg.last_cfg));
g_array_resize(btb->consumed, bucket_num);
btb->refilled=realloc(btb->refilled, btb->cfg.bucket_num*sizeof(struct refill_mark));
memset(btb->refilled, 0, btb->cfg.bucket_num*sizeof(struct refill_mark));
@@ -169,6 +168,7 @@ void bulk_token_bucket_info(const struct bulk_token_bucket *btb, struct timeval
info->CBS=btb->cfg.CBS;
info->bucket_number=btb->cfg.bucket_num;
info->replicas=g_array_replicas(btb->consumed);
+ ST_hyperloglog_step(btb->hll, now);
info->estimate_keys=ST_hyperloglog_count(btb->hll);
info->collision_rate=collision_probability(info->bucket_number, info->estimate_keys);
return;
@@ -241,9 +241,9 @@ struct bulk_token_bucket *bulk_token_bucket_deserialize(const char *blob, size_t
}
void bulk_token_bucket_merge(struct bulk_token_bucket *dst, const struct bulk_token_bucket *src)
{
- if(timercmp(&(dst->cfg.timestamp), &(src->cfg.timestamp), <))//Last-Write-Wins
+ if(timercmp(&(dst->cfg.last_cfg), &(src->cfg.last_cfg), <))//Last-Write-Wins
{
- bulk_token_bucket_configure(dst, src->cfg.timestamp, src->cfg.CIR, src->cfg.CBS, src->cfg.bucket_num);
+ bulk_token_bucket_configure(dst, src->cfg.last_cfg, src->cfg.CIR, src->cfg.CBS, src->cfg.bucket_num);
}
if(timercmp(&(dst->perturb_timestamp), &(src->perturb_timestamp), <))//Last-Write-Wins
{
@@ -271,6 +271,15 @@ void bulk_token_bucket_merge_blob(struct bulk_token_bucket *btb, const char * bl
bulk_token_bucket_free(src);
return;
}
+struct bulk_token_bucket *bulk_token_bucket_replicate(uuid_t uuid, const char *blob, size_t blob_sz)
+{
+ struct bulk_token_bucket *btb=bulk_token_bucket_deserialize(blob, blob_sz);
+ struct g_array *tmp=btb->consumed;
+ btb->consumed=g_array_new(uuid, btb->cfg.bucket_num);
+ g_array_merge(btb->consumed, tmp);
+ g_array_free(tmp);
+ return btb;
+}
size_t bulk_token_bucket_mem_size(const struct bulk_token_bucket *btb)
{
//to do
diff --git a/CRDT/bulk_token_bucket.h b/CRDT/bulk_token_bucket.h
index cce7324..5300b89 100644
--- a/CRDT/bulk_token_bucket.h
+++ b/CRDT/bulk_token_bucket.h
@@ -32,6 +32,7 @@ void bulk_token_bucket_serialize(const struct bulk_token_bucket *btb, char **blo
struct bulk_token_bucket *bulk_token_bucket_deserialize(const char *blob, size_t blob_sz);
void bulk_token_bucket_merge(struct bulk_token_bucket *dst, const struct bulk_token_bucket *src);
void bulk_token_bucket_merge_blob(struct bulk_token_bucket *btb, const char * blob, size_t blob_sz);
+struct bulk_token_bucket *bulk_token_bucket_replicate(uuid_t uuid, const char *blob, size_t blob_sz);
size_t bulk_token_bucket_mem_size(const struct bulk_token_bucket *btb);
#ifdef __cplusplus
}
diff --git a/CRDT/crdt_base_gtest.cpp b/CRDT/crdt_base_gtest.cpp
index 24aa476..0b9bf94 100644
--- a/CRDT/crdt_base_gtest.cpp
+++ b/CRDT/crdt_base_gtest.cpp
@@ -1725,6 +1725,136 @@ TEST(STHyperLogLog, Reconfigure)
ST_hyperloglog_free(h[i]);
}
}
+TEST(STHyperLogLog, EventualConsistency)
+{
+ struct timeval start, step, now;
+ gettimeofday(&start, NULL);
+ memcpy(&now, &start, sizeof(now));
+ int n_replica=2;
+ struct ST_hyperloglog *h[n_replica];
+ int time_window_s=10;
+ unsigned char precision=6;
+ for(int i=0; i<n_replica; i++)
+ {
+ h[i]=ST_hyperloglog_new(precision, time_window_s, start);
+ }
+ int key=1319823, j=0;
+ int n_add=0;
+ int add_per_step=100;
+ step.tv_sec=0;
+ step.tv_usec=1000;
+ int item_per_second=add_per_step*1000*1000/step.tv_usec;
+ while(now.tv_sec-start.tv_sec<time_window_s*5)
+ {
+ timeradd(&now, &step, &now);
+ for(int i=0; i<add_per_step; i++)
+ {
+ j=1+random()%(n_replica-1);
+ key++;
+ ST_hyperloglog_add(h[j], (const char *)&key, sizeof(key), now);
+ n_add++;
+ }
+ }
+ for(int i=0; i<n_replica; i++)
+ {
+ for(int j=0; j<n_replica; j++)
+ {
+ if(i==j) continue;
+ ST_hyperloglog_merge(h[i], h[j]);
+ }
+ }
+ double hll_count=0, error=0;
+ hll_count=ST_hyperloglog_count(h[0]);
+ error=ST_hyperloglog_error_for_precision(precision);
+ EXPECT_NEAR(hll_count, item_per_second*time_window_s, error*item_per_second*time_window_s);
+
+ for(int i=0; i<n_replica; i++)
+ {
+ ST_hyperloglog_free(h[i]);
+ }
+}
+TEST(STHyperLogLog, Merge)
+{
+ struct timeval start, now;
+ gettimeofday(&start, NULL);
+ memcpy(&now, &start, sizeof(now));
+ int n_replica=2;
+ struct ST_hyperloglog *h[n_replica];
+ int time_window_s=10;
+ unsigned char precision=6;
+ for(int i=0; i<n_replica; i++)
+ {
+ h[i]=ST_hyperloglog_new(precision, time_window_s, start);
+ start.tv_sec+=time_window_s+1;
+ }
+ for(int i=0; i<n_replica; i++)
+ {
+ for(int j=0; j<n_replica; j++)
+ {
+ if(i==j) continue;
+ ST_hyperloglog_merge(h[i], h[j]);
+ }
+ }
+ for(int i=0; i<n_replica; i++)
+ {
+ ST_hyperloglog_free(h[i]);
+ }
+}
+TEST(STHyperLogLog, Step)
+{
+ struct timeval start, step, now;
+ gettimeofday(&start, NULL);
+ memcpy(&now, &start, sizeof(now));
+ step.tv_sec=0;
+ step.tv_usec=10000;
+ int time_window_s=32;
+ unsigned char precision=8;
+ struct ST_hyperloglog *h=ST_hyperloglog_new(precision, time_window_s, start);
+ int key=1;
+ int add_per_step=1;
+ int item_per_second=add_per_step*1000*1000/step.tv_usec;
+ while(now.tv_sec-start.tv_sec<time_window_s*10)
+ {
+ timeradd(&now, &step, &now);
+ for(int i=0; i<add_per_step; i++)
+ {
+ key++;
+ ST_hyperloglog_add(h, (const char *)&key, sizeof(key), now);
+ }
+ }
+ double hll_count=0;
+ double error=ST_hyperloglog_error_for_precision(precision);
+ hll_count=ST_hyperloglog_count(h);
+ printf("time_window: %d, count: %d, error: %.2f \n", time_window_s, item_per_second*time_window_s, error);
+ for(int i=0; i<time_window_s; i++)
+ {
+ ST_hyperloglog_step(h, now);
+ hll_count=ST_hyperloglog_count(h);
+ printf("t+%d, estimate: %.2f\n", i, hll_count);
+ now.tv_sec++;
+ //EXPECT_NEAR(hll_count, item_per_second*(time_window_s-i), error*item_per_second*(time_window_s-i));
+ }
+ ST_hyperloglog_free(h);
+ return;
+/*
+
+ hll_count=ST_hyperloglog_count(h);
+ EXPECT_NEAR(hll_count, item_per_second*time_window_s, error*item_per_second*time_window_s);
+
+ now.tv_sec += time_window_s/12;
+ ST_hyperloglog_step(h, now);
+ hll_count=ST_hyperloglog_count(h);
+ EXPECT_NEAR(hll_count, item_per_second*time_window_s/4, error*item_per_second*time_window_s/2);
+
+
+ now.tv_sec += time_window_s/12;
+ ST_hyperloglog_step(h, now);
+ hll_count=ST_hyperloglog_count(h);
+ EXPECT_NEAR(hll_count, 0, error*item_per_second*time_window_s);
+
+ ST_hyperloglog_free(h);
+ */
+}
TEST(GArray, Basic)
{
uuid_t uuid;
diff --git a/CRDT/fair_token_bucket.c b/CRDT/fair_token_bucket.c
index 26a37ea..629227e 100644
--- a/CRDT/fair_token_bucket.c
+++ b/CRDT/fair_token_bucket.c
@@ -1,7 +1,6 @@
#include "fair_token_bucket.h"
#include "oc_token_bucket.h"
#include "st_hyperloglog.h"
-//#include "g_array.h"
#include "crdt_utils.h"
#include "xxhash.h"
@@ -17,7 +16,6 @@
struct fair_token_bucket
{
/* Sync Variables*/
- uuid_t uuid;
long long divisor;
struct timeval last_cfg;
struct ST_hyperloglog *hll[FAIR_TB_WEIGHT_MAX]; //counting active keys
@@ -38,16 +36,13 @@ struct fair_token_bucket *fair_token_bucket_new(uuid_t uuid, struct timeval now,
{
struct fair_token_bucket *ftb=ALLOC(struct fair_token_bucket, 1);
- uuid_copy(ftb->uuid, uuid);
ftb->divisor=divisor;
for(int i=0; i<FAIR_TB_WEIGHT_MAX; i++)
{
ftb->hll[i]=ST_hyperloglog_new(9, 5, now);
}
- //ftb->sfq=g_array_new(uuid, ftb->divisor);
ftb->sfq=ALLOC(long long, divisor);
ftb->bucket=OC_token_bucket_new(uuid, now, CIR, CBS);
- //OC_token_bucket_set_no_reserve(ftb->bucket);
ftb->per_weight_quantum=CIR;
memcpy(&ftb->last_cfg, &now, sizeof(ftb->last_cfg));
return ftb;
@@ -59,7 +54,6 @@ void fair_token_bucket_free(struct fair_token_bucket *ftb)
{
ST_hyperloglog_free(ftb->hll[i]);
}
- //g_array_free(ftb->sfq);
free(ftb->sfq);
OC_token_bucket_free(ftb->bucket);
free(ftb);
@@ -71,6 +65,7 @@ static void ftb_quantum_estimation(struct fair_token_bucket *ftb, struct timeval
long long total_weight=0, n_active_key=0, count=0;
for(int i=0; i<FAIR_TB_WEIGHT_MAX; i++)
{
+ ST_hyperloglog_step(ftb->hll[i], now);
count = ST_hyperloglog_count(ftb->hll[i]);
total_weight += (i+1)*count;
n_active_key += count;
@@ -93,7 +88,6 @@ static void ftb_quantum_estimation(struct fair_token_bucket *ftb, struct timeval
static void ftb_perturb(struct fair_token_bucket *ftb, struct timeval now)
{
ftb_quantum_estimation(ftb, now);
- //g_array_reset(ftb->sfq);
memset(ftb->sfq, 0, sizeof(long long)*ftb->divisor);
ftb->perturb_seed++;
}
@@ -115,7 +109,6 @@ long long fair_token_bucket_consume(struct fair_token_bucket *ftb, struct timeva
sfq_idx=XXH3_64bits_withSeed(key, keylen, ftb->perturb_seed)%ftb->divisor;
long long deficit_est=0;
- //deficit_est=g_array_get(ftb->sfq, sfq_idx);
deficit_est=ftb->sfq[sfq_idx];
deficit_est/=MAX(1, ftb->n_active_key/ftb->divisor);
@@ -128,7 +121,6 @@ long long fair_token_bucket_consume(struct fair_token_bucket *ftb, struct timeva
long long allocated_tokens=OC_token_bucket_consume(ftb->bucket, now, cmd, tokens);
if(allocated_tokens)
{
- //g_array_incrby(ftb->sfq, sfq_idx, allocated_tokens);
ftb->sfq[sfq_idx]+=allocated_tokens;
}
ftb->debug_last_allocated=allocated_tokens;
@@ -138,7 +130,6 @@ void fair_token_bucket_configure(struct fair_token_bucket *ftb, struct timeval n
{
ftb->divisor=divisor;
ftb->sfq=(long long*)realloc(ftb->sfq, sizeof(long long)*ftb->divisor);
- //g_array_resize(ftb->sfq, ftb->divisor);
memcpy(&ftb->last_cfg, &now, sizeof(ftb->last_cfg));
OC_token_bucket_configure(ftb->bucket, now, CIR, CBS);
ftb_perturb(ftb, now);
@@ -148,6 +139,7 @@ void fair_token_bucket_info(const struct fair_token_bucket *ftb, struct timeval
{
for(int i=0; i<FAIR_TB_WEIGHT_MAX; i++)
{
+ ST_hyperloglog_step(ftb->hll[i], now);
info->active_key_number += ST_hyperloglog_count(ftb->hll[i]);
}
info->divisor = ftb->divisor;
@@ -169,7 +161,6 @@ size_t fair_token_bucket_serialized_size(const struct fair_token_bucket *ftb)
{
sz += ST_hyperloglog_serialized_size(ftb->hll[i]);
}
-// sz += g_array_serialized_size(ftb->sfq);
sz += OC_token_bucket_serialized_size(ftb->bucket);
return sz;
}
@@ -194,13 +185,6 @@ void fair_token_bucket_serialize(const struct fair_token_bucket *ftb, char **blo
offset += ST_hyperloglog_serialized_size(ftb->hll[i]);
free(tmp_buff);
}
- /*
- g_array_serialize(ftb->sfq, &tmp_buff, &tmp_sz);
- memcpy(buffer+offset, tmp_buff, tmp_sz);
- assert(tmp_sz==g_array_serialized_size(ftb->sfq));
- offset += g_array_serialized_size(ftb->sfq);
- free(tmp_buff);
- */
OC_token_bucket_serialize(ftb->bucket, &tmp_buff, &tmp_sz);
memcpy(buffer+offset, tmp_buff, tmp_sz);
offset += OC_token_bucket_serialized_size(ftb->bucket);
@@ -226,10 +210,6 @@ struct fair_token_bucket *fair_token_bucket_deserialize(const char *blob, size_t
ftb->hll[i]=ST_hyperloglog_deserialize(blob+offset, blob_sz-offset);
offset += ST_hyperloglog_serialized_size(ftb->hll[i]);
}
- /*
- ftb->sfq=g_array_deserialize(blob+offset, blob_sz-offset);
- offset += g_array_serialized_size(ftb->sfq);
- */
ftb->bucket=OC_token_bucket_deserialize(blob+offset, blob_sz-offset);
offset += OC_token_bucket_serialized_size(ftb->bucket);
assert(offset==hdr.payload_sz);
@@ -247,7 +227,6 @@ void fair_token_bucket_merge(struct fair_token_bucket *dst, const struct fair_to
dst->sfq=(long long*)realloc(dst->sfq, sizeof(long long)*dst->divisor);
memcpy(&dst->last_cfg, &src->last_cfg, sizeof(dst->last_cfg));
}
- //g_array_merge(dst->sfq, src->sfq);
OC_token_bucket_merge(dst->bucket, src->bucket);
return;
}
@@ -258,6 +237,14 @@ void fair_token_bucket_merge_blob(struct fair_token_bucket *ftb, const char *blo
fair_token_bucket_free(src);
return;
}
+struct fair_token_bucket *fair_token_bucket_replicate(uuid_t uuid, const char *blob, size_t blob_sz)
+{
+ struct fair_token_bucket *ftb=fair_token_bucket_deserialize(blob, blob_sz);
+ OC_token_bucket_change_uuid(ftb->bucket, uuid);
+ ftb->sfq=ALLOC(long long, ftb->divisor);
+ return ftb;
+
+}
size_t fair_token_bucket_mem_size(const struct fair_token_bucket *ftb)
{
size_t sz=0;
@@ -266,7 +253,6 @@ size_t fair_token_bucket_mem_size(const struct fair_token_bucket *ftb)
{
sz += ST_hyperloglog_mem_size(ftb->hll[i]);
}
- //sz+=g_array_mem_size(ftb->sfq);
sz+=sizeof(long long)*ftb->divisor;
sz+=OC_token_bucket_mem_size(ftb->bucket);
return sz;
diff --git a/CRDT/fair_token_bucket.h b/CRDT/fair_token_bucket.h
index 06bc17c..9b57fac 100644
--- a/CRDT/fair_token_bucket.h
+++ b/CRDT/fair_token_bucket.h
@@ -37,6 +37,7 @@ void fair_token_bucket_serialize(const struct fair_token_bucket *ftb, char **blo
struct fair_token_bucket *fair_token_bucket_deserialize(const char *blob, size_t blob_sz);
void fair_token_bucket_merge(struct fair_token_bucket *dst, const struct fair_token_bucket *src);
void fair_token_bucket_merge_blob(struct fair_token_bucket *ftb, const char *blob, size_t blob_sz);
+struct fair_token_bucket *fair_token_bucket_replicate(uuid_t uuid, const char *blob, size_t blob_sz);
size_t fair_token_bucket_mem_size(const struct fair_token_bucket *ftb);
#ifdef __cplusplus
}
diff --git a/CRDT/lww_register.c b/CRDT/lww_register.c
index 7781735..5dbedb1 100644
--- a/CRDT/lww_register.c
+++ b/CRDT/lww_register.c
@@ -81,7 +81,15 @@ void LWW_register_merge_blob(struct LWW_register *reg, const char *blob, size_t
LWW_register_free(to_merge);
return;
}
-size_t LWW_regeister_mem_size(const struct LWW_register *reg)
+struct LWW_register *LWW_register_replicate(uuid_t uuid, const char *blob, size_t blob_sz)
+{
+ struct LWW_register *src=LWW_register_deserialize(blob, blob_sz);
+ struct LWW_register *dst=LWW_register_new(uuid);
+ LWW_register_merge(dst, src);
+ LWW_register_free(src);
+ return dst;
+}
+size_t LWW_register_mem_size(const struct LWW_register *reg)
{
return (sizeof(struct LWW_register)+reg->size);
}
diff --git a/CRDT/lww_register.h b/CRDT/lww_register.h
index 2217211..18865a0 100644
--- a/CRDT/lww_register.h
+++ b/CRDT/lww_register.h
@@ -27,7 +27,8 @@ struct LWW_register *LWW_register_deserialize(const char *blob, size_t blob_sz);
void LWW_register_merge(struct LWW_register *dst, struct LWW_register *src);
void LWW_register_merge_blob(struct LWW_register *reg, const char *blob, size_t blob_sz);
-size_t LWW_regeister_mem_size(const struct LWW_register *reg);
+struct LWW_register *LWW_register_replicate(uuid_t uuid, const char *blob, size_t blob_sz);
+size_t LWW_register_mem_size(const struct LWW_register *reg);
#ifdef __cplusplus
}
diff --git a/CRDT/oc_token_bucket.c b/CRDT/oc_token_bucket.c
index be0b092..429dd33 100644
--- a/CRDT/oc_token_bucket.c
+++ b/CRDT/oc_token_bucket.c
@@ -12,7 +12,7 @@ struct OC_configuration
long long CIR; //Committed Information Rate
long long CBS; //Committed Burst Size
long long refill_interval_ms;
- struct timeval write_timestamp;
+ struct timeval last_cfg;
};
#define timeval_to_ms(t) ((t).tv_sec*1000+(t).tv_usec/1000)
struct OC_token_bucket
@@ -31,8 +31,7 @@ struct OC_token_bucket *OC_token_bucket_new(uuid_t my_id, struct timeval now, lo
{
struct OC_token_bucket *bucket=ALLOC(struct OC_token_bucket, 1);
bucket->consumed=PN_counter_new(my_id);
- gettimeofday(&bucket->cfg.write_timestamp, NULL);
- memcpy(&bucket->cfg.write_timestamp, &now, sizeof(bucket->cfg.write_timestamp));
+ //memcpy(&bucket->cfg.last_cfg, &now, sizeof(bucket->cfg.last_cfg));
bucket->cfg.CIR=CIR;
bucket->cfg.CBS=CBS;
bucket->cfg.refill_interval_ms=OCTB_REFILL_INTERVAL_MS_DEFAULT;
@@ -50,7 +49,7 @@ void OC_token_bucket_free(struct OC_token_bucket *bucket)
}
void OC_token_bucket_configure(struct OC_token_bucket *bucket, struct timeval now, long long CIR, long long CBS)
{
- memcpy(&bucket->cfg.write_timestamp, &now, sizeof(bucket->cfg.write_timestamp));
+ memcpy(&bucket->cfg.last_cfg, &now, sizeof(bucket->cfg.last_cfg));
if(CIR>=0) bucket->cfg.CIR=CIR;
if(CBS>=0) bucket->cfg.CBS=CBS;
}
@@ -156,7 +155,7 @@ struct OC_token_bucket *OC_token_bucket_deserialize(const char *blob, size_t blo
}
void OC_token_bucket_merge(struct OC_token_bucket *dst, const struct OC_token_bucket *src)
{
- if(timercmp(&(dst->cfg.write_timestamp), &(src->cfg.write_timestamp), <))//Last-Write-Wins
+ if(timercmp(&(dst->cfg.last_cfg), &(src->cfg.last_cfg), <))//Last-Write-Wins
{
memcpy(&dst->cfg, &src->cfg, sizeof(src->cfg));
}
@@ -178,6 +177,21 @@ void OC_token_bucket_merge_blob(struct OC_token_bucket *bucket, const char *blob
OC_token_bucket_free(to_merge);
return;
}
+
+void OC_token_bucket_change_uuid(struct OC_token_bucket *bucket, uuid_t new_uuid)
+{
+ struct PN_counter *tmp=bucket->consumed;
+ bucket->consumed=PN_counter_new(new_uuid);
+ PN_counter_merge(bucket->consumed, tmp);
+ PN_counter_free(tmp);
+ return;
+}
+struct OC_token_bucket *OC_token_bucket_replicate(uuid_t uuid, const char *blob, size_t blob_sz)
+{
+ struct OC_token_bucket *bucket=OC_token_bucket_deserialize(blob, blob_sz);
+ OC_token_bucket_change_uuid(bucket, uuid);
+ return bucket;
+}
size_t OC_token_bucket_mem_size(const struct OC_token_bucket *bucket)
{
size_t sz=0;
diff --git a/CRDT/oc_token_bucket.h b/CRDT/oc_token_bucket.h
index 26a0a32..8830fcd 100644
--- a/CRDT/oc_token_bucket.h
+++ b/CRDT/oc_token_bucket.h
@@ -32,7 +32,7 @@ struct OC_token_bucket_info
void OC_token_bucket_info(struct OC_token_bucket *bucket, struct timeval now, struct OC_token_bucket_info *info);
void OC_token_bucket_free(struct OC_token_bucket *bucket);
void OC_token_bucket_set_no_reserve(struct OC_token_bucket *bucket);
-
+void OC_token_bucket_change_uuid(struct OC_token_bucket *bucket, uuid_t new_uuid);
long long OC_token_bucket_consume(struct OC_token_bucket *bucket, struct timeval now, enum tb_consume_type cmd, long long tokens);
void OC_token_bucket_serialize(const struct OC_token_bucket *bucket, char **blob, size_t *blob_sz);
@@ -40,6 +40,7 @@ struct OC_token_bucket *OC_token_bucket_deserialize(const char *blob, size_t blo
void OC_token_bucket_merge(struct OC_token_bucket *dst, const struct OC_token_bucket *src);
void OC_token_bucket_merge_blob(struct OC_token_bucket *bucket, const char *blob, size_t blob_sz);
+struct OC_token_bucket *OC_token_bucket_replicate(uuid_t uuid, const char *blob, size_t blob_sz);
size_t OC_token_bucket_serialized_size(const struct OC_token_bucket *bucket);
size_t OC_token_bucket_mem_size(const struct OC_token_bucket *bucket);
#ifdef __cplusplus
diff --git a/CRDT/or_map.c b/CRDT/or_map.c
index 5315994..bc4cd6c 100644
--- a/CRDT/or_map.c
+++ b/CRDT/or_map.c
@@ -157,7 +157,7 @@ size_t OR_record_size(const struct OR_record *record)
sz+=PN_counter_serialized_size(record->counter);
break;
case TYPE_STRING:
- sz+=LWW_regeister_mem_size(record->string);
+ sz+=LWW_register_mem_size(record->string);
break;
default:
break;
@@ -747,6 +747,14 @@ void OR_map_merge_blob(struct OR_map *map, const char *blob, size_t blob_sz)
OR_map_free(to_merge);
return;
}
+struct OR_map *OR_map_replicate(uuid_t uuid, const char *blob, size_t blob_sz)
+{
+ struct OR_map *src=OR_map_deserialize(blob, blob_sz);
+ struct OR_map *dst=OR_map_new(uuid);
+ OR_map_merge(dst, src);
+ OR_map_free(src);
+ return dst;
+}
size_t OR_map_mem_size(const struct OR_map *map)
{
size_t sz=0;
diff --git a/CRDT/or_map.h b/CRDT/or_map.h
index 771448b..4a69e92 100644
--- a/CRDT/or_map.h
+++ b/CRDT/or_map.h
@@ -49,6 +49,7 @@ void OR_map_serialize(const struct OR_map *map, char **blob, size_t *blob_sz);
struct OR_map *OR_map_deserialize(const char *blob, size_t blob_sz);
void OR_map_merge(struct OR_map *dst, struct OR_map *src);
void OR_map_merge_blob(struct OR_map *map, const char *blob, size_t blob_sz);
+struct OR_map *OR_map_replicate(uuid_t uuid, const char *blob, size_t blob_sz);
size_t OR_map_mem_size(const struct OR_map *map);
#ifdef __cplusplus
diff --git a/CRDT/or_set.c b/CRDT/or_set.c
index d1ea936..0dc9054 100644
--- a/CRDT/or_set.c
+++ b/CRDT/or_set.c
@@ -93,11 +93,23 @@ void OR_set_serialize(const struct OR_set *set, char **blob, size_t *blob_sz)
OR_map_serialize(set->map, blob, blob_sz);
return;
}
+struct OR_set *OR_set_deserialize(const char *blob, size_t blob_sz)
+{
+ struct OR_set *set=ALLOC(struct OR_set, 1);
+ set->map=OR_map_deserialize(blob, blob_sz);
+ return set;
+}
void OR_set_merge_blob(struct OR_set *set, const char *blob, size_t blob_sz)
{
OR_map_merge_blob(set->map, blob, blob_sz);
return;
}
+struct OR_set *OR_set_replicate(uuid_t uuid, const char *blob, size_t blob_sz)
+{
+ struct OR_set *dst=OR_set_new(uuid);
+ OR_set_merge_blob(dst, blob, blob_sz);
+ return dst;
+}
size_t OR_set_mem_size(const struct OR_set *set)
{
return sizeof(struct OR_set)+OR_map_mem_size(set->map);
diff --git a/CRDT/or_set.h b/CRDT/or_set.h
index 094683f..0293405 100644
--- a/CRDT/or_set.h
+++ b/CRDT/or_set.h
@@ -37,6 +37,7 @@ struct OR_set_member_list* OR_set_members(const struct OR_set *set);
void OR_set_serialize(const struct OR_set *set, char **blob, size_t *blob_sz);
void OR_set_merge_blob(struct OR_set *set, const char *blob, size_t blob_sz);
+struct OR_set *OR_set_replicate(uuid_t uuid, const char *blob, size_t blob_sz);
size_t OR_set_mem_size(const struct OR_set *set);
#ifdef __cplusplus
diff --git a/CRDT/pn_counter.c b/CRDT/pn_counter.c
index d1431dd..1c69589 100644
--- a/CRDT/pn_counter.c
+++ b/CRDT/pn_counter.c
@@ -172,6 +172,14 @@ void PN_counter_merge_blob(struct PN_counter *pnc, const char *blob, size_t blob
PN_counter_free(to_merge);
return;
}
+struct PN_counter *PN_counter_replicate(uuid_t uuid, const char *blob, size_t blob_sz)
+{
+ struct PN_counter *src=PN_counter_deserialize(blob, blob_sz);
+ struct PN_counter *pnc=PN_counter_new(uuid);
+ PN_counter_merge(pnc, src);
+ PN_counter_free(src);
+ return pnc;
+}
size_t PN_counter_replica_num(const struct PN_counter *pnc)
{
return MAX(HASH_COUNT(pnc->hash_item), 1);
diff --git a/CRDT/pn_counter.h b/CRDT/pn_counter.h
index d56cf84..484832d 100644
--- a/CRDT/pn_counter.h
+++ b/CRDT/pn_counter.h
@@ -26,6 +26,7 @@ long long PN_counter_incrby(struct PN_counter *pnc, long long increment);
void PN_counter_merge_blob(struct PN_counter *pnc, const char *blob, size_t blob_sz);
void PN_counter_merge(struct PN_counter *dst, const struct PN_counter *src);
+struct PN_counter *PN_counter_replicate(uuid_t uuid, const char *blob, size_t blob_sz);
void PN_counter_serialize(const struct PN_counter *pnc, char **blob, size_t *blob_sz);
struct PN_counter *PN_counter_deserialize(const char *blob, size_t blob_sz);
size_t PN_counter_serialized_size(const struct PN_counter *pnc);
diff --git a/CRDT/st_hyperloglog.c b/CRDT/st_hyperloglog.c
index 42cb162..897c32a 100644
--- a/CRDT/st_hyperloglog.c
+++ b/CRDT/st_hyperloglog.c
@@ -19,7 +19,7 @@
#define NUM_REG(precision) ((1 << precision))
#define INT_CEIL(num, denom) (((num) + (denom) - 1) / (denom))
-
+#define PRECISION_TO_WORD(precision) INT_CEIL(NUM_REG(precision), REG_PER_WORD)
struct ST_HLL_configuration
{
@@ -48,15 +48,12 @@ struct ST_hyperloglog *ST_hyperloglog_new(unsigned char precision, int time_wind
// Store precision
h->cfg.precision = precision;
h->cfg.time_window_s=time_window_seconds;
- memcpy(&h->cfg.timestamp, &now, sizeof(h->cfg.timestamp));
+ //memcpy(&h->cfg.timestamp, &now, sizeof(h->cfg.timestamp));
memcpy(&h->reset_time, &now, sizeof(h->reset_time));
- // Determine how many registers are needed
- int num_reg = NUM_REG(precision);
- // Get the full words required
- int words = INT_CEIL(num_reg, REG_PER_WORD);
+ int words = PRECISION_TO_WORD(precision);
// Allocate and zero out the registers
h->registers = ALLOC(uint32_t, words);
@@ -68,15 +65,11 @@ void ST_hyperloglog_configure(struct ST_hyperloglog *h, unsigned char precision,
if(h->cfg.precision != precision)
{
free(h->registers);
- // Determine how many registers are needed
- int reg = NUM_REG(precision);
-
- // Get the full words required
- int words = INT_CEIL(reg, REG_PER_WORD);
+ h->cfg.precision=precision;
// Allocate and zero out the registers
- h->registers = ALLOC(uint32_t, words);
- h->cfg.precision=precision;
+ h->registers = ALLOC(uint32_t, PRECISION_TO_WORD(h->cfg.precision));
+
}
memcpy(&h->cfg.timestamp, &now, sizeof(h->cfg.timestamp));
return;
@@ -139,10 +132,11 @@ static void periodic_reset(struct ST_hyperloglog *h, const struct timeval now)
int num_reg=NUM_REG(h->cfg.precision);
int reset_time_slot_us=h->cfg.time_window_s*2*1000*1000/num_reg;
long long delta_us=timeval_delta_us(h->reset_time, now);
+
struct timeval step;
- //reset_time_slot_us+=1000;
step.tv_sec=reset_time_slot_us/1000/1000;
step.tv_usec=reset_time_slot_us%(1000*1000);
+
if(delta_us>reset_time_slot_us)
{
for(int i=0; i<delta_us/reset_time_slot_us; i++)
@@ -156,34 +150,65 @@ static void periodic_reset(struct ST_hyperloglog *h, const struct timeval now)
int ST_hyperloglog_add(struct ST_hyperloglog *h, const char *key, size_t keylen, const struct timeval now)
{
periodic_reset(h, now);
- // Compute the hash value of the key
- uint64_t hash=0;
+ uint64_t hash=0;
hash=XXH3_64bits_withSeed(key, keylen, 171);
- // Add the hashed value
- return hll_add_hash(h, hash);
+ return hll_add_hash(h, hash);
+}
+static struct ST_hyperloglog *ST_hyperloglog_copy(const struct ST_hyperloglog *src)
+{
+ struct ST_hyperloglog *dst=ST_hyperloglog_new(src->cfg.precision, src->cfg.time_window_s, src->reset_time);
+ size_t num_reg = NUM_REG(src->cfg.precision);
+ size_t words = INT_CEIL(num_reg, REG_PER_WORD);
+
+ memcpy(dst->registers, src->registers, words*sizeof(int32_t));
+ return dst;
+}
+void ST_hyperloglog_step(struct ST_hyperloglog *h, const struct timeval now)
+{
+ periodic_reset(h, now);
+ return;
}
void ST_hyperloglog_merge(struct ST_hyperloglog *dst, const struct ST_hyperloglog *src)
{
+
if(timercmp(&(dst->cfg.timestamp), &(src->cfg.timestamp), <))//Last-Write-Wins
{
ST_hyperloglog_configure(dst, src->cfg.precision, src->cfg.time_window_s, src->cfg.timestamp);
}
if(dst->cfg.precision != src->cfg.precision) return;
- int n_register=NUM_REG(dst->cfg.precision);
- int s_reg=0, d_reg=0;
- for(int i=0; i<n_register; i++)
+ if(dst->reset_time.tv_sec - src->reset_time.tv_sec > dst->cfg.time_window_s)//src is too old, no need to merge
{
- s_reg=get_register(src, i);
- d_reg=get_register(dst, i);
- set_register(dst, i, MAX(s_reg, d_reg));
+ return;
}
- if(timercmp(&(dst->reset_time), &(src->reset_time), <))//Last-Write-Wins
+ else if(src->reset_time.tv_sec - dst->reset_time.tv_sec > dst->cfg.time_window_s) //dst is too old, just copy the src register
{
dst->reset_idx=src->reset_idx;
memcpy(&dst->reset_time, &src->reset_time, sizeof(src->reset_time));
+ memcpy(dst->registers, src->registers, PRECISION_TO_WORD(dst->cfg.precision)*sizeof(int32_t));
+ }
+ else
+ {
+ struct ST_hyperloglog *tmp=ST_hyperloglog_copy(src);//create a copy of src for periodic reset
+ if(timercmp(&(dst->reset_time), &(tmp->reset_time), <))//correct with latest timestamp
+ {
+ periodic_reset(dst, tmp->reset_time);
+ }
+ else
+ {
+ periodic_reset(tmp, dst->reset_time);
+ }
+ int n_register=NUM_REG(dst->cfg.precision);
+ int s_reg=0, d_reg=0;
+ for(int i=0; i<n_register; i++)
+ {
+
+ s_reg=get_register(tmp, i);
+ d_reg=get_register(dst, i);
+ set_register(dst, i, MAX(s_reg, d_reg));
+ }
+ ST_hyperloglog_free(tmp);
}
-
return;
}
size_t ST_hyperloglog_serialized_size(const struct ST_hyperloglog *h)
@@ -235,7 +260,7 @@ void ST_hyperloglog_merge_blob(struct ST_hyperloglog *dst, const char *blob, siz
ST_hyperloglog_free(src);
return;
}
-double g_switchThreshold[15] = {10, 20, 40, 80, 220, 400, 900, 1800, 3100, 6500,
+static double g_switchThreshold[15] = {10, 20, 40, 80, 220, 400, 900, 1800, 3100, 6500,
11500, 20000, 50000, 120000, 350000};
static double *g_rawEstimateData[] = {
diff --git a/CRDT/st_hyperloglog.h b/CRDT/st_hyperloglog.h
index 9da5c47..cb43c75 100644
--- a/CRDT/st_hyperloglog.h
+++ b/CRDT/st_hyperloglog.h
@@ -25,6 +25,7 @@ struct ST_hyperloglog *ST_hyperloglog_new(unsigned char precision, int time_wind
void ST_hyperloglog_free(struct ST_hyperloglog *h);
//Return 1 if at least 1 ST HyperLogLog internal register was altered. 0 otherwise.
int ST_hyperloglog_add(struct ST_hyperloglog *h, const char *key, size_t keylen, const struct timeval now);
+void ST_hyperloglog_step(struct ST_hyperloglog *h, const struct timeval now);
double ST_hyperloglog_count(const struct ST_hyperloglog *h);
size_t ST_hyperloglog_serialized_size(const struct ST_hyperloglog *h);
void ST_hyperloglog_serialize(const struct ST_hyperloglog *h, char **blob, size_t *blob_sz);
diff --git a/docs/design.md b/docs/design.md
index 611ae1f..0bf5d0b 100644
--- a/docs/design.md
+++ b/docs/design.md
@@ -1,7 +1,8 @@
# Design
-I list design choices here.
+> Don't communicate by sharing memory; share memory by communicating. (Rob Pike)
+I list design choices here.
Following are terminologies used in Swarm KV.
diff --git a/src/swarmkv_store.c b/src/swarmkv_store.c
index 987cfd4..79be1ab 100644
--- a/src/swarmkv_store.c
+++ b/src/swarmkv_store.c
@@ -21,36 +21,12 @@ struct swarmkv_obj_specs
{
enum sobj_type type;
const char *type_name;
- void *(*obj_new) (uuid_t uuid);
void (*obj_free) (void *obj);
void (*obj_serialize) (const void *obj, char **blob, size_t *blob_sz);
void (*obj_merge_blob) (void *obj, const char *blob, size_t blob_sz);
+ void *(*obj_replicate) (uuid_t uuid, const char *blob, size_t blob_sz);
size_t (*obj_size)(const void *obj);
};
-static void *__wrap_OC_token_bucket_new(uuid_t uuid)
-{
- struct OC_token_bucket *bucket=NULL;
- struct timeval beginning_of_history;
- memset(&beginning_of_history, 0, sizeof(beginning_of_history));
- bucket=OC_token_bucket_new(uuid, beginning_of_history, 0, 0);
- return bucket;
-}
-static void *__wrap_fair_token_bucket_new(uuid_t uuid)
-{
- struct fair_token_bucket *bucket=NULL;
- struct timeval beginning_of_history;
- memset(&beginning_of_history, 0, sizeof(beginning_of_history));
- bucket=fair_token_bucket_new(uuid, beginning_of_history, 0, 0, 0);
- return bucket;
-}
-static void *__wrap_bulk_token_bucket_new(uuid_t uuid)
-{
- struct bulk_token_bucket *bucket=NULL;
- struct timeval beginning_of_history;
- memset(&beginning_of_history, 0, sizeof(beginning_of_history));
- bucket=bulk_token_bucket_new(uuid, beginning_of_history, 0, 0, 0);
- return bucket;
-}
size_t undefined_obj_mem_size(void *obj)
{
return 0;
@@ -65,73 +41,73 @@ struct swarmkv_obj_specs sobj_specs[__SWARMKV_OBJ_TYPE_MAX] =
{
.type=OBJ_TYPE_STRING,
.type_name="string",
- .obj_new=(void * (*)(unsigned char *))LWW_register_new,
.obj_free=(void (*)(void *))LWW_register_free,
.obj_serialize=(void (*)(const void *, char **, size_t *))LWW_register_serialize,
.obj_merge_blob=(void (*)(void *, const char *, size_t))LWW_register_merge_blob,
- .obj_size=(size_t (*)(const void *))LWW_regeister_mem_size
+ .obj_replicate=(void * (*)(uuid_t, const char *, size_t))LWW_register_replicate,
+ .obj_size=(size_t (*)(const void *))LWW_register_mem_size
},
{
.type=OBJ_TYPE_INTEGER,
.type_name="integer",
- .obj_new=(void * (*)(unsigned char *))PN_counter_new,
.obj_free=(void (*)(void *))PN_counter_free,
.obj_serialize=(void (*)(const void *, char **, size_t *))PN_counter_serialize,
.obj_merge_blob=(void (*)(void *, const char *, size_t))PN_counter_merge_blob,
+ .obj_replicate=(void * (*)(uuid_t, const char *, size_t))PN_counter_replicate,
.obj_size=(size_t (*)(const void *))PN_counter_mem_size
},
{
.type=OBJ_TYPE_SET,
.type_name="set",
- .obj_new=(void * (*)(unsigned char *))OR_set_new,
.obj_free=(void (*)(void *))OR_set_free,
.obj_serialize=(void (*)(const void *, char **, size_t *))OR_set_serialize,
.obj_merge_blob=(void (*)(void *, const char *, size_t))OR_set_merge_blob,
+ .obj_replicate=(void * (*)(uuid_t, const char *, size_t))OR_set_replicate,
.obj_size=(size_t (*)(const void *))OR_set_mem_size
},
{
.type=OBJ_TYPE_HASH,
.type_name="hash",
- .obj_new=(void * (*)(unsigned char *))OR_map_new,
.obj_free=(void (*)(void *))OR_map_free,
.obj_serialize=(void (*)(const void *, char **, size_t *))OR_map_serialize,
.obj_merge_blob=(void (*)(void *, const char *, size_t))OR_map_merge_blob,
+ .obj_replicate=(void * (*)(uuid_t, const char *, size_t))OR_map_replicate,
.obj_size=(size_t (*)(const void *))OR_map_mem_size
},
{
.type=OBJ_TYPE_TOKEN_BUCKET,
.type_name="token-bucket",
- .obj_new=__wrap_OC_token_bucket_new,
.obj_free=(void (*)(void *))OC_token_bucket_free,
.obj_serialize=(void (*)(const void *, char **, size_t *))OC_token_bucket_serialize,
.obj_merge_blob=(void (*)(void *, const char *, size_t))OC_token_bucket_merge_blob,
+ .obj_replicate=(void * (*)(uuid_t, const char *, size_t))OC_token_bucket_replicate,
.obj_size=(size_t (*)(const void *))OC_token_bucket_mem_size,
},
{
.type=OBJ_TYPE_FAIR_TOKEN_BUCKET,
.type_name="fair-token-bucket",
- .obj_new=__wrap_fair_token_bucket_new,
.obj_free=(void (*)(void *))fair_token_bucket_free,
.obj_serialize=(void (*)(const void *, char **, size_t *))fair_token_bucket_serialize,
.obj_merge_blob=(void (*)(void *, const char *, size_t))fair_token_bucket_merge_blob,
+ .obj_replicate=(void * (*)(uuid_t, const char *, size_t))fair_token_bucket_replicate,
.obj_size=(size_t (*)(const void *))fair_token_bucket_mem_size,
},
{
.type=OBJ_TYPE_BULK_TOKEN_BUCKET,
.type_name="bulk-token-bucket",
- .obj_new=__wrap_bulk_token_bucket_new,
.obj_free=(void (*)(void *))bulk_token_bucket_free,
.obj_serialize=(void (*)(const void *, char **, size_t *))bulk_token_bucket_serialize,
.obj_merge_blob=(void (*)(void *, const char *, size_t))bulk_token_bucket_merge_blob,
+ .obj_replicate=(void * (*)(uuid_t, const char *, size_t))bulk_token_bucket_replicate,
.obj_size=(size_t (*)(const void *))bulk_token_bucket_mem_size
},
{
.type=OBJ_TYPE_UNDEFINED,
.type_name="undefined",
- .obj_new=NULL,
.obj_free=undefined_obj_free,
.obj_serialize=NULL,
.obj_merge_blob=NULL,
+ .obj_replicate=NULL,
.obj_size=(size_t (*)(const void *))undefined_obj_mem_size
}
};
@@ -440,9 +416,12 @@ void sobj_merge_blob(struct sobj *obj, const char *blob, size_t blob_sz, uuid_t
const char *value_blob=blob+offset;
if(!obj->raw)
{
- obj->raw=sobj_specs[obj->type].obj_new(uuid);
+ obj->raw=sobj_specs[obj->type].obj_replicate(uuid, value_blob, value_blob_sz);
+ }
+ else
+ {
+ sobj_specs[obj->type].obj_merge_blob(obj->raw, value_blob, value_blob_sz);
}
- sobj_specs[obj->type].obj_merge_blob(obj->raw, value_blob, value_blob_sz);
return;
}