summaryrefslogtreecommitdiff
path: root/client/nirvana_conhash.cpp
diff options
context:
space:
mode:
author[email protected] <[email protected]>2021-07-16 16:06:59 +0800
committer[email protected] <[email protected]>2021-07-16 16:06:59 +0800
commit26b1a0850061a6fad963772991abcd6303cd50f3 (patch)
treeefa72e09e43cf88bd8229118ea17f3947a672059 /client/nirvana_conhash.cpp
创建
Diffstat (limited to 'client/nirvana_conhash.cpp')
-rw-r--r--client/nirvana_conhash.cpp482
1 files changed, 482 insertions, 0 deletions
diff --git a/client/nirvana_conhash.cpp b/client/nirvana_conhash.cpp
new file mode 100644
index 0000000..9a95403
--- /dev/null
+++ b/client/nirvana_conhash.cpp
@@ -0,0 +1,482 @@
+#include <stdint.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <assert.h>
+#include <math.h>
+
+#include <map>
+
+#include "nirvana_murmurhash.h"
+#include "nirvana_conhash.h"
+
+using namespace std;
+
+#ifndef offsetof
+#define offsetof(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER)
+#endif
+
+struct ch_point
+{
+ u_int32_t bucket_id;
+ u_int32_t bucket_index; /* which backend it belongs to, use IP address */
+ u_int64_t hit_cnt;
+ u_int64_t point_val; /* hash code of this nodes, it is used to map node to consistent hash cycle */
+};
+
+struct ch_bucket_inner
+{
+ struct conhash_bucket bucket;
+
+ struct ch_point point_array[CONHASH_MAX_POINTS_PER_BUCKET]; //��ʹ��point_val��Աȥ��
+ int32_t is_valid;
+ int32_t bucket_index;
+ u_int64_t hit_cnt;
+};
+
+struct consistent_hash
+{
+ struct ch_bucket_inner *bucket_array;
+ u_int32_t bucket_array_size;
+ u_int32_t bucket_cnt;
+ u_int32_t point_num;
+ struct ch_point *point_array;
+ map<u_int32_t, u_int32_t> *map_id_index;
+};
+
+//coefficient of variation of the RMSD
+double conhash_calulate_CVRSMD(struct consistent_hash *ch)
+{
+ struct ch_bucket_inner* b=NULL;
+ u_int32_t i=0;
+ double sum_hit=0, sum_point=0;
+ double MSE=0,RMSD=0,CVRMSD=0;
+ for(i=0;i<ch->bucket_array_size;i++)
+ {
+ b=ch->bucket_array+i;
+ if(b->is_valid==0)
+ {
+ continue;
+ }
+ sum_hit+=(double)b->hit_cnt;
+ sum_point+=(double)b->bucket.point_num;
+ }
+ for(i=0;i<ch->bucket_array_size;i++)
+ {
+ b=ch->bucket_array+i;
+ if(b->is_valid==0)
+ {
+ continue;
+ }
+ MSE+=pow(b->hit_cnt-(b->bucket.point_num*sum_hit)/sum_point,2);
+ }
+ RMSD = sqrt(MSE/ch->bucket_cnt);
+ CVRMSD = RMSD/(sum_hit/ch->bucket_cnt);
+ return CVRMSD;
+}
+
+static int qsort_cmp_by_key_increase(const void* a, const void* b)
+{
+ if(((const struct ch_point*)a)->point_val > ((const struct ch_point*)b)->point_val)
+ {
+ return 1;
+ }
+ else if(((const struct ch_point*)a)->point_val == ((const struct ch_point*)b)->point_val)
+ {
+ return 0;
+ }
+ else
+ {
+ return -1;
+ }
+}
+
+static int qsort_cmp_by_key_decrease(const void* a, const void* b)
+{
+ if(((const struct ch_point*)a)->point_val > ((const struct ch_point*)b)->point_val)
+ {
+ return -1;
+ }
+ else if(((const struct ch_point*)a)->point_val == ((const struct ch_point*)b)->point_val)
+ {
+ return 0;
+ }
+ else
+ {
+ return 1;
+ }
+}
+
+// (vector<int>& nums, int target)
+static u_int32_t search_up_bound(u_int64_t target, const void *base,
+ int32_t nmemb, size_t size,int val_offset)
+{
+ int32_t low = 0, high = nmemb-1, mid;
+
+ // Invariant: the desired index is between [low, high+1]
+
+ while (low <= high)
+ {
+ mid = low + (high-low)/2;
+ if(*(u_int64_t*)((char*)base+size*mid+val_offset) < target)
+ {
+ low = mid+1;
+ }
+ else
+ {
+ high = mid-1;
+ }
+ }
+ if(low == nmemb)
+ {
+ low=0;
+ }
+ // (1) At this point, low > high. That is, low >= high+1
+ // (2) From the invariant, we know that the index is between [low, high+1], so low <= high+1. Follwing from (1), now we know low == high+1.
+ // (3) Following from (2), the index is between [low, high+1] = [low, low], which means that low is the desired index
+ // Therefore, we return low as the answer. You can also return high+1 as the result, since low == high+1
+ return low;
+}
+
+//��֤��ͬ��bucket_id&&point_index������ͬ��point_id
+static u_int64_t bucket_gen_uniq_point(struct ch_bucket_inner *inner_bucket, u_int32_t cur_point_index)
+{
+ u_int64_t x=0, seed;
+ u_int32_t hash, i=0;
+
+ seed = (((u_int64_t)cur_point_index)<<32) | inner_bucket->bucket.bucket_id;
+ hash = murmurhash2(&seed, sizeof(u_int64_t), 23068673);
+ x = (((u_int64_t)hash)<<32) | inner_bucket->bucket.bucket_id;
+
+ while(i != cur_point_index)
+ {
+ for(i=0; i<cur_point_index; i++)
+ {
+ if(x == inner_bucket->point_array[i].point_val) //��ͻ
+ {
+ seed = (((u_int64_t)hash)<<32) | inner_bucket->bucket.bucket_id;
+ hash = murmurhash2(&seed, sizeof(u_int64_t), 23068673);
+ x = (((u_int64_t)hash)<<32) | inner_bucket->bucket.bucket_id;
+ i = 0;
+ break;
+ }
+ }
+ }
+ inner_bucket->point_array[cur_point_index].point_val = x;
+ return x;
+}
+
+u_int32_t conhash_get_bucket_num(struct consistent_hash *ch)
+{
+ return ch->bucket_cnt;
+}
+
+struct consistent_hash *conhash_instance_new(const struct conhash_bucket *buckets, uint32_t bucket_num)
+{
+ struct consistent_hash *ch=NULL;
+ u_int32_t i, j, k;
+ u_int64_t randval;
+
+ for(i=0; i<bucket_num; i++)
+ {
+ if(buckets[i].point_num > CONHASH_MAX_POINTS_PER_BUCKET)
+ {
+ return NULL;
+ }
+ }
+ ch = (struct consistent_hash *)calloc(1, sizeof(struct consistent_hash));
+
+ /*buckets*/
+ ch->map_id_index = new map<u_int32_t, u_int32_t>;
+ ch->bucket_array = (struct ch_bucket_inner*)calloc(1, sizeof(struct ch_bucket_inner)*bucket_num);
+ for(i=0; i<bucket_num; i++)
+ {
+ memcpy(&(ch->bucket_array[i].bucket), &buckets[i], sizeof(struct conhash_bucket));
+ ch->bucket_array[i].is_valid = 1;
+ ch->bucket_array[i].bucket_index = i;
+ ch->point_num += buckets[i].point_num;
+ ch->map_id_index->insert(make_pair(buckets[i].bucket_id, i));
+ }
+ ch->bucket_cnt = bucket_num;
+
+ /*global points*/
+ ch->point_array = (struct ch_point*)calloc(1, sizeof(struct ch_point)*ch->point_num);
+ ch->bucket_array_size = bucket_num;
+ for(i=0, k=0; i<ch->bucket_array_size; i++)
+ {
+ for(j=0; j<ch->bucket_array[i].bucket.point_num; j++,k++)
+ {
+ randval = bucket_gen_uniq_point(&ch->bucket_array[i], j);
+ ch->point_array[k].bucket_id = ch->bucket_array[i].bucket.bucket_id;
+ ch->point_array[k].bucket_index = i;
+ ch->point_array[k].point_val = randval;
+ ch->point_array[k].hit_cnt = 0;
+ }
+ qsort(ch->bucket_array[i].point_array, ch->bucket_array[i].bucket.point_num, sizeof(struct ch_point), qsort_cmp_by_key_decrease);
+ }
+ qsort(ch->point_array, ch->point_num, sizeof(struct ch_point), qsort_cmp_by_key_increase);
+ return ch;
+}
+
+void conhash_instance_free(struct consistent_hash *ch)
+{
+ free(ch->point_array);
+ free(ch->bucket_array);
+ delete ch->map_id_index;
+ free(ch);
+}
+
+struct consistent_hash *conhash_instance_copy(struct consistent_hash *ch)
+{
+ struct consistent_hash *copy;
+
+ copy = (struct consistent_hash *)calloc(1, sizeof(struct consistent_hash));
+ copy->bucket_array_size = ch->bucket_array_size;
+ copy->bucket_cnt = ch->bucket_cnt;
+ copy->point_num = ch->point_num;
+ copy->bucket_array = (struct ch_bucket_inner*)calloc(sizeof(struct ch_bucket_inner), ch->bucket_array_size);
+ memcpy(copy->bucket_array, ch->bucket_array, sizeof(struct ch_bucket_inner)*ch->bucket_array_size);
+
+ copy->point_array = (struct ch_point*)calloc(sizeof(struct ch_point), copy->point_num);
+ memcpy(copy->point_array, ch->point_array, sizeof(struct ch_point)*copy->point_num);
+
+ copy->map_id_index = new map<u_int32_t, u_int32_t>;
+ copy->map_id_index->insert(ch->map_id_index->begin(), ch->map_id_index->end());
+ return copy;
+}
+
+static enum CONHASH_ERRCODE conhash_add_points(struct consistent_hash *ch, struct ch_bucket_inner *inner_bucket, int32_t add_points)
+{
+ u_int64_t randval;
+
+ if(inner_bucket->bucket.point_num + add_points > CONHASH_MAX_POINTS_PER_BUCKET)
+ {
+ assert(0);return CONHASH_ERR_INVALID_ARGS;
+ }
+
+ ch->point_array = (struct ch_point *)realloc(ch->point_array,sizeof(struct ch_point)*(ch->point_num+add_points));
+
+ for(u_int32_t i=ch->point_num; i<ch->point_num+add_points; i++)
+ {
+ randval = bucket_gen_uniq_point(inner_bucket, inner_bucket->bucket.point_num);
+ inner_bucket->bucket.point_num++;
+ ch->point_array[i].bucket_id = inner_bucket->bucket.bucket_id;
+ ch->point_array[i].bucket_index = inner_bucket->bucket_index;
+ ch->point_array[i].point_val = randval;
+ ch->point_array[i].hit_cnt = 0;
+ }
+ ch->point_num += add_points;
+ qsort(inner_bucket->point_array, inner_bucket->bucket.point_num, sizeof(struct ch_point), qsort_cmp_by_key_decrease);
+ qsort(ch->point_array, ch->point_num, sizeof(struct ch_point), qsort_cmp_by_key_increase);
+ return CONHASH_OK;
+}
+
+static enum CONHASH_ERRCODE conhash_del_points(struct consistent_hash *ch, struct ch_bucket_inner *inner_bucket, u_int32_t del_points)
+{
+ struct ch_point *tmp_points;
+ u_int32_t i, j, removed;
+
+ if(inner_bucket->bucket.point_num == 0)
+ {
+ return CONHASH_OK;
+ }
+ if(inner_bucket->bucket.point_num < del_points)
+ {
+ assert(0);return CONHASH_ERR_INVALID_ARGS;
+ }
+
+ tmp_points = (struct ch_point*)malloc(sizeof(struct ch_point)*ch->point_num);
+ memcpy(tmp_points, ch->point_array, sizeof(struct ch_point)*ch->point_num);
+
+ for(i=0,j=0,removed=0; i<ch->point_num; i++)
+ {
+ if(removed<del_points && tmp_points[i].bucket_id==inner_bucket->bucket.bucket_id)
+ {
+ assert(inner_bucket->point_array[inner_bucket->bucket.point_num-1].point_val == tmp_points[i].point_val);
+ inner_bucket->bucket.point_num--;
+ removed++;
+ continue;
+ }
+ memcpy(&ch->point_array[j], &tmp_points[i], sizeof(struct ch_point));
+ j++;
+ }
+ assert(removed == del_points);
+ free(tmp_points);
+ ch->point_num -= del_points;
+ //Sort is unnecessary after deletion.
+ return CONHASH_OK;
+}
+
+enum CONHASH_ERRCODE conhash_insert_bucket(struct consistent_hash *ch, const struct conhash_bucket *bucket)
+{
+ struct ch_bucket_inner *inner_bucket=NULL;
+ u_int32_t i, bucket_index;
+ map<u_int32_t, u_int32_t>::iterator iter;
+ enum CONHASH_ERRCODE code;
+
+ if(bucket->point_num <= 0)
+ {
+ return CONHASH_ERR_INVALID_ARGS;
+ }
+ if((iter=ch->map_id_index->find(bucket->bucket_id)) != ch->map_id_index->end())
+ {
+ return CONHASH_BUCKET_ALREADY_EXIST;
+ }
+
+ if(ch->bucket_cnt < ch->bucket_array_size)
+ {
+ for(i=0; i<ch->bucket_array_size; i++)
+ {
+ if(ch->bucket_array[i].is_valid == 0)
+ {
+ bucket_index = i;
+ break;
+ }
+ }
+ assert(i < ch->bucket_array_size && ch->bucket_array[bucket_index].bucket.point_num==0); //һ�����ҵ�
+
+ inner_bucket = &ch->bucket_array[bucket_index];
+ }
+ else
+ {
+ assert(ch->bucket_array_size == ch->bucket_cnt);
+ bucket_index = ch->bucket_cnt;
+ ch->bucket_array_size = ch->bucket_cnt + 1;
+ ch->bucket_array = (struct ch_bucket_inner*)realloc(ch->bucket_array, sizeof(struct ch_bucket_inner)*ch->bucket_array_size);
+ memset(&ch->bucket_array[bucket_index], 0, sizeof(struct ch_bucket_inner));
+ inner_bucket = &ch->bucket_array[bucket_index];
+ }
+ inner_bucket->bucket.bucket_id = bucket->bucket_id;
+ inner_bucket->bucket.tag = bucket->tag;
+
+ if(CONHASH_OK != (code=conhash_add_points(ch, inner_bucket, bucket->point_num)))
+ {
+ return code;
+ }
+ inner_bucket->is_valid = 1;
+ inner_bucket->bucket_index = bucket_index;
+ inner_bucket->hit_cnt = 0;
+ ch->bucket_cnt++;
+ ch->map_id_index->insert(make_pair(bucket->bucket_id, bucket_index));
+ return CONHASH_OK;
+}
+
+enum CONHASH_ERRCODE conhash_remove_bucket(struct consistent_hash *ch, u_int32_t bucket_id, void (*free_cb)(void *tag, u_int32_t point_num))
+{
+ struct ch_bucket_inner* inner_bucket=NULL;
+ u_int32_t bucket_index;
+ map<u_int32_t, u_int32_t>::iterator iter;
+ enum CONHASH_ERRCODE code;
+
+ if((iter=ch->map_id_index->find(bucket_id)) == ch->map_id_index->end())
+ {
+ return CONHASH_BUCKET_NOT_FOUND;
+ }
+ bucket_index = iter->second;
+ assert(bucket_index < ch->bucket_array_size);
+
+ inner_bucket = &ch->bucket_array[bucket_index];
+ if(CONHASH_OK != (code=conhash_del_points(ch, inner_bucket, inner_bucket->bucket.point_num)))
+ {
+ return code;
+ }
+ ch->bucket_cnt--;
+ inner_bucket->is_valid = 0;
+ if(free_cb)
+ {
+ free_cb(inner_bucket->bucket.tag, inner_bucket->bucket.point_num);
+ }
+ inner_bucket->bucket.point_num = 0;
+ inner_bucket->bucket.tag = NULL;
+ ch->map_id_index->erase(bucket_id);
+ return CONHASH_OK;
+}
+
+enum CONHASH_ERRCODE conhash_renew_bucket(struct consistent_hash *ch, struct conhash_bucket *bucket)
+{
+ struct ch_bucket_inner* inner_bucket=NULL;
+ u_int32_t bucket_index;
+ map<u_int32_t, u_int32_t>::iterator iter;
+
+ if((iter=ch->map_id_index->find(bucket->bucket_id)) == ch->map_id_index->end())
+ {
+ assert(0);return CONHASH_BUCKET_NOT_FOUND;
+ }
+ bucket_index = iter->second;
+ assert(bucket_index < ch->bucket_array_size);
+
+ inner_bucket = &ch->bucket_array[bucket_index];
+ assert(inner_bucket->is_valid == 1);
+ inner_bucket->bucket.tag = bucket->tag;
+
+ if(inner_bucket->bucket.point_num == bucket->point_num)
+ {
+ return CONHASH_OK;
+ }
+ else if(inner_bucket->bucket.point_num < bucket->point_num)
+ {
+ return conhash_add_points(ch, inner_bucket, bucket->point_num-inner_bucket->bucket.point_num);
+ }
+ else
+ {
+ return conhash_del_points(ch, inner_bucket, inner_bucket->bucket.point_num-bucket->point_num);
+ }
+}
+
+enum CONHASH_ERRCODE conhash_lookup_bucket(struct consistent_hash *ch, const void* key, int len, struct conhash_bucket* result)
+{
+ int idx=0, bucket_index=0;
+ u_int64_t hash;
+
+ if(ch->bucket_cnt == 0)
+ {
+ return CONHASH_NO_VALID_BUCKETS;
+ }
+
+ hash = MurmurHash64A(key, len, 23068673);
+ idx = search_up_bound(hash, ch->point_array, ch->point_num, sizeof(struct ch_point), offsetof(struct ch_point, point_val));
+ ch->point_array[idx].hit_cnt++;
+ bucket_index = ch->point_array[idx].bucket_index;
+ assert(ch->bucket_array[bucket_index].is_valid == 1);
+ ch->bucket_array[bucket_index].hit_cnt++;
+ memcpy(result, &(ch->bucket_array[bucket_index].bucket), sizeof(struct conhash_bucket));
+ return CONHASH_OK;
+}
+
+enum CONHASH_ERRCODE conhash_lookup_bucket_int(struct consistent_hash *ch, u_int64_t randint, struct conhash_bucket* result)
+{
+ int idx=0, bucket_index=0;
+
+ if(ch->bucket_cnt == 0)
+ {
+ return CONHASH_NO_VALID_BUCKETS;
+ }
+
+ idx = search_up_bound(randint, ch->point_array, ch->point_num, sizeof(struct ch_point), offsetof(struct ch_point, point_val));
+ ch->point_array[idx].hit_cnt++;
+ bucket_index = ch->point_array[idx].bucket_index;
+ assert(ch->bucket_array[bucket_index].is_valid == 1);
+ ch->bucket_array[bucket_index].hit_cnt++;
+ memcpy(result, &(ch->bucket_array[bucket_index].bucket), sizeof(struct conhash_bucket));
+ return CONHASH_OK;
+}
+
+void conhash_dump_detail(struct consistent_hash *ch)
+{
+ /*for(u_int32_t i=0; i<ch->point_num; i++)
+ {
+ printf("bucket_id: %10u, bucket_index: %5u, point_val:%lx, hit_cnt: %lu\n", ch->point_array[i].bucket_id,
+ ch->point_array[i].bucket_index, ch->point_array[i].point_val, ch->point_array[i].hit_cnt);
+ }
+
+ printf("\n\n\n\n");*/
+ for(u_int32_t i=0; i<ch->bucket_cnt; i++)
+ {
+ if(ch->bucket_array[i].is_valid)
+ {
+ printf("bucket_id: %10u, bucket_index: %5u, hit_cnt: %lu\n", ch->bucket_array[i].bucket.bucket_id,
+ i, ch->bucket_array[i].hit_cnt);
+ }
+ }
+}
+