diff options
| author | [email protected] <[email protected]> | 2021-07-16 16:06:59 +0800 |
|---|---|---|
| committer | [email protected] <[email protected]> | 2021-07-16 16:06:59 +0800 |
| commit | 26b1a0850061a6fad963772991abcd6303cd50f3 (patch) | |
| tree | efa72e09e43cf88bd8229118ea17f3947a672059 /client/nirvana_conhash.cpp | |
创建
Diffstat (limited to 'client/nirvana_conhash.cpp')
| -rw-r--r-- | client/nirvana_conhash.cpp | 482 |
1 files changed, 482 insertions, 0 deletions
diff --git a/client/nirvana_conhash.cpp b/client/nirvana_conhash.cpp new file mode 100644 index 0000000..9a95403 --- /dev/null +++ b/client/nirvana_conhash.cpp @@ -0,0 +1,482 @@ +#include <stdint.h> +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <assert.h> +#include <math.h> + +#include <map> + +#include "nirvana_murmurhash.h" +#include "nirvana_conhash.h" + +using namespace std; + +#ifndef offsetof +#define offsetof(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER) +#endif + +struct ch_point +{ + u_int32_t bucket_id; + u_int32_t bucket_index; /* which backend it belongs to, use IP address */ + u_int64_t hit_cnt; + u_int64_t point_val; /* hash code of this nodes, it is used to map node to consistent hash cycle */ +}; + +struct ch_bucket_inner +{ + struct conhash_bucket bucket; + + struct ch_point point_array[CONHASH_MAX_POINTS_PER_BUCKET]; //��ʹ��point_val��Աȥ�� + int32_t is_valid; + int32_t bucket_index; + u_int64_t hit_cnt; +}; + +struct consistent_hash +{ + struct ch_bucket_inner *bucket_array; + u_int32_t bucket_array_size; + u_int32_t bucket_cnt; + u_int32_t point_num; + struct ch_point *point_array; + map<u_int32_t, u_int32_t> *map_id_index; +}; + +//coefficient of variation of the RMSD +double conhash_calulate_CVRSMD(struct consistent_hash *ch) +{ + struct ch_bucket_inner* b=NULL; + u_int32_t i=0; + double sum_hit=0, sum_point=0; + double MSE=0,RMSD=0,CVRMSD=0; + for(i=0;i<ch->bucket_array_size;i++) + { + b=ch->bucket_array+i; + if(b->is_valid==0) + { + continue; + } + sum_hit+=(double)b->hit_cnt; + sum_point+=(double)b->bucket.point_num; + } + for(i=0;i<ch->bucket_array_size;i++) + { + b=ch->bucket_array+i; + if(b->is_valid==0) + { + continue; + } + MSE+=pow(b->hit_cnt-(b->bucket.point_num*sum_hit)/sum_point,2); + } + RMSD = sqrt(MSE/ch->bucket_cnt); + CVRMSD = RMSD/(sum_hit/ch->bucket_cnt); + return CVRMSD; +} + +static int qsort_cmp_by_key_increase(const void* a, const void* b) +{ + if(((const struct ch_point*)a)->point_val > ((const struct ch_point*)b)->point_val) + { + return 1; + } + else if(((const struct ch_point*)a)->point_val == ((const struct ch_point*)b)->point_val) + { + return 0; + } + else + { + return -1; + } +} + +static int qsort_cmp_by_key_decrease(const void* a, const void* b) +{ + if(((const struct ch_point*)a)->point_val > ((const struct ch_point*)b)->point_val) + { + return -1; + } + else if(((const struct ch_point*)a)->point_val == ((const struct ch_point*)b)->point_val) + { + return 0; + } + else + { + return 1; + } +} + +// (vector<int>& nums, int target) +static u_int32_t search_up_bound(u_int64_t target, const void *base, + int32_t nmemb, size_t size,int val_offset) +{ + int32_t low = 0, high = nmemb-1, mid; + + // Invariant: the desired index is between [low, high+1] + + while (low <= high) + { + mid = low + (high-low)/2; + if(*(u_int64_t*)((char*)base+size*mid+val_offset) < target) + { + low = mid+1; + } + else + { + high = mid-1; + } + } + if(low == nmemb) + { + low=0; + } + // (1) At this point, low > high. That is, low >= high+1 + // (2) From the invariant, we know that the index is between [low, high+1], so low <= high+1. Follwing from (1), now we know low == high+1. + // (3) Following from (2), the index is between [low, high+1] = [low, low], which means that low is the desired index + // Therefore, we return low as the answer. You can also return high+1 as the result, since low == high+1 + return low; +} + +//��֤��ͬ��bucket_id&&point_index������ͬ��point_id +static u_int64_t bucket_gen_uniq_point(struct ch_bucket_inner *inner_bucket, u_int32_t cur_point_index) +{ + u_int64_t x=0, seed; + u_int32_t hash, i=0; + + seed = (((u_int64_t)cur_point_index)<<32) | inner_bucket->bucket.bucket_id; + hash = murmurhash2(&seed, sizeof(u_int64_t), 23068673); + x = (((u_int64_t)hash)<<32) | inner_bucket->bucket.bucket_id; + + while(i != cur_point_index) + { + for(i=0; i<cur_point_index; i++) + { + if(x == inner_bucket->point_array[i].point_val) //��ͻ + { + seed = (((u_int64_t)hash)<<32) | inner_bucket->bucket.bucket_id; + hash = murmurhash2(&seed, sizeof(u_int64_t), 23068673); + x = (((u_int64_t)hash)<<32) | inner_bucket->bucket.bucket_id; + i = 0; + break; + } + } + } + inner_bucket->point_array[cur_point_index].point_val = x; + return x; +} + +u_int32_t conhash_get_bucket_num(struct consistent_hash *ch) +{ + return ch->bucket_cnt; +} + +struct consistent_hash *conhash_instance_new(const struct conhash_bucket *buckets, uint32_t bucket_num) +{ + struct consistent_hash *ch=NULL; + u_int32_t i, j, k; + u_int64_t randval; + + for(i=0; i<bucket_num; i++) + { + if(buckets[i].point_num > CONHASH_MAX_POINTS_PER_BUCKET) + { + return NULL; + } + } + ch = (struct consistent_hash *)calloc(1, sizeof(struct consistent_hash)); + + /*buckets*/ + ch->map_id_index = new map<u_int32_t, u_int32_t>; + ch->bucket_array = (struct ch_bucket_inner*)calloc(1, sizeof(struct ch_bucket_inner)*bucket_num); + for(i=0; i<bucket_num; i++) + { + memcpy(&(ch->bucket_array[i].bucket), &buckets[i], sizeof(struct conhash_bucket)); + ch->bucket_array[i].is_valid = 1; + ch->bucket_array[i].bucket_index = i; + ch->point_num += buckets[i].point_num; + ch->map_id_index->insert(make_pair(buckets[i].bucket_id, i)); + } + ch->bucket_cnt = bucket_num; + + /*global points*/ + ch->point_array = (struct ch_point*)calloc(1, sizeof(struct ch_point)*ch->point_num); + ch->bucket_array_size = bucket_num; + for(i=0, k=0; i<ch->bucket_array_size; i++) + { + for(j=0; j<ch->bucket_array[i].bucket.point_num; j++,k++) + { + randval = bucket_gen_uniq_point(&ch->bucket_array[i], j); + ch->point_array[k].bucket_id = ch->bucket_array[i].bucket.bucket_id; + ch->point_array[k].bucket_index = i; + ch->point_array[k].point_val = randval; + ch->point_array[k].hit_cnt = 0; + } + qsort(ch->bucket_array[i].point_array, ch->bucket_array[i].bucket.point_num, sizeof(struct ch_point), qsort_cmp_by_key_decrease); + } + qsort(ch->point_array, ch->point_num, sizeof(struct ch_point), qsort_cmp_by_key_increase); + return ch; +} + +void conhash_instance_free(struct consistent_hash *ch) +{ + free(ch->point_array); + free(ch->bucket_array); + delete ch->map_id_index; + free(ch); +} + +struct consistent_hash *conhash_instance_copy(struct consistent_hash *ch) +{ + struct consistent_hash *copy; + + copy = (struct consistent_hash *)calloc(1, sizeof(struct consistent_hash)); + copy->bucket_array_size = ch->bucket_array_size; + copy->bucket_cnt = ch->bucket_cnt; + copy->point_num = ch->point_num; + copy->bucket_array = (struct ch_bucket_inner*)calloc(sizeof(struct ch_bucket_inner), ch->bucket_array_size); + memcpy(copy->bucket_array, ch->bucket_array, sizeof(struct ch_bucket_inner)*ch->bucket_array_size); + + copy->point_array = (struct ch_point*)calloc(sizeof(struct ch_point), copy->point_num); + memcpy(copy->point_array, ch->point_array, sizeof(struct ch_point)*copy->point_num); + + copy->map_id_index = new map<u_int32_t, u_int32_t>; + copy->map_id_index->insert(ch->map_id_index->begin(), ch->map_id_index->end()); + return copy; +} + +static enum CONHASH_ERRCODE conhash_add_points(struct consistent_hash *ch, struct ch_bucket_inner *inner_bucket, int32_t add_points) +{ + u_int64_t randval; + + if(inner_bucket->bucket.point_num + add_points > CONHASH_MAX_POINTS_PER_BUCKET) + { + assert(0);return CONHASH_ERR_INVALID_ARGS; + } + + ch->point_array = (struct ch_point *)realloc(ch->point_array,sizeof(struct ch_point)*(ch->point_num+add_points)); + + for(u_int32_t i=ch->point_num; i<ch->point_num+add_points; i++) + { + randval = bucket_gen_uniq_point(inner_bucket, inner_bucket->bucket.point_num); + inner_bucket->bucket.point_num++; + ch->point_array[i].bucket_id = inner_bucket->bucket.bucket_id; + ch->point_array[i].bucket_index = inner_bucket->bucket_index; + ch->point_array[i].point_val = randval; + ch->point_array[i].hit_cnt = 0; + } + ch->point_num += add_points; + qsort(inner_bucket->point_array, inner_bucket->bucket.point_num, sizeof(struct ch_point), qsort_cmp_by_key_decrease); + qsort(ch->point_array, ch->point_num, sizeof(struct ch_point), qsort_cmp_by_key_increase); + return CONHASH_OK; +} + +static enum CONHASH_ERRCODE conhash_del_points(struct consistent_hash *ch, struct ch_bucket_inner *inner_bucket, u_int32_t del_points) +{ + struct ch_point *tmp_points; + u_int32_t i, j, removed; + + if(inner_bucket->bucket.point_num == 0) + { + return CONHASH_OK; + } + if(inner_bucket->bucket.point_num < del_points) + { + assert(0);return CONHASH_ERR_INVALID_ARGS; + } + + tmp_points = (struct ch_point*)malloc(sizeof(struct ch_point)*ch->point_num); + memcpy(tmp_points, ch->point_array, sizeof(struct ch_point)*ch->point_num); + + for(i=0,j=0,removed=0; i<ch->point_num; i++) + { + if(removed<del_points && tmp_points[i].bucket_id==inner_bucket->bucket.bucket_id) + { + assert(inner_bucket->point_array[inner_bucket->bucket.point_num-1].point_val == tmp_points[i].point_val); + inner_bucket->bucket.point_num--; + removed++; + continue; + } + memcpy(&ch->point_array[j], &tmp_points[i], sizeof(struct ch_point)); + j++; + } + assert(removed == del_points); + free(tmp_points); + ch->point_num -= del_points; + //Sort is unnecessary after deletion. + return CONHASH_OK; +} + +enum CONHASH_ERRCODE conhash_insert_bucket(struct consistent_hash *ch, const struct conhash_bucket *bucket) +{ + struct ch_bucket_inner *inner_bucket=NULL; + u_int32_t i, bucket_index; + map<u_int32_t, u_int32_t>::iterator iter; + enum CONHASH_ERRCODE code; + + if(bucket->point_num <= 0) + { + return CONHASH_ERR_INVALID_ARGS; + } + if((iter=ch->map_id_index->find(bucket->bucket_id)) != ch->map_id_index->end()) + { + return CONHASH_BUCKET_ALREADY_EXIST; + } + + if(ch->bucket_cnt < ch->bucket_array_size) + { + for(i=0; i<ch->bucket_array_size; i++) + { + if(ch->bucket_array[i].is_valid == 0) + { + bucket_index = i; + break; + } + } + assert(i < ch->bucket_array_size && ch->bucket_array[bucket_index].bucket.point_num==0); //һ�����ҵ� + + inner_bucket = &ch->bucket_array[bucket_index]; + } + else + { + assert(ch->bucket_array_size == ch->bucket_cnt); + bucket_index = ch->bucket_cnt; + ch->bucket_array_size = ch->bucket_cnt + 1; + ch->bucket_array = (struct ch_bucket_inner*)realloc(ch->bucket_array, sizeof(struct ch_bucket_inner)*ch->bucket_array_size); + memset(&ch->bucket_array[bucket_index], 0, sizeof(struct ch_bucket_inner)); + inner_bucket = &ch->bucket_array[bucket_index]; + } + inner_bucket->bucket.bucket_id = bucket->bucket_id; + inner_bucket->bucket.tag = bucket->tag; + + if(CONHASH_OK != (code=conhash_add_points(ch, inner_bucket, bucket->point_num))) + { + return code; + } + inner_bucket->is_valid = 1; + inner_bucket->bucket_index = bucket_index; + inner_bucket->hit_cnt = 0; + ch->bucket_cnt++; + ch->map_id_index->insert(make_pair(bucket->bucket_id, bucket_index)); + return CONHASH_OK; +} + +enum CONHASH_ERRCODE conhash_remove_bucket(struct consistent_hash *ch, u_int32_t bucket_id, void (*free_cb)(void *tag, u_int32_t point_num)) +{ + struct ch_bucket_inner* inner_bucket=NULL; + u_int32_t bucket_index; + map<u_int32_t, u_int32_t>::iterator iter; + enum CONHASH_ERRCODE code; + + if((iter=ch->map_id_index->find(bucket_id)) == ch->map_id_index->end()) + { + return CONHASH_BUCKET_NOT_FOUND; + } + bucket_index = iter->second; + assert(bucket_index < ch->bucket_array_size); + + inner_bucket = &ch->bucket_array[bucket_index]; + if(CONHASH_OK != (code=conhash_del_points(ch, inner_bucket, inner_bucket->bucket.point_num))) + { + return code; + } + ch->bucket_cnt--; + inner_bucket->is_valid = 0; + if(free_cb) + { + free_cb(inner_bucket->bucket.tag, inner_bucket->bucket.point_num); + } + inner_bucket->bucket.point_num = 0; + inner_bucket->bucket.tag = NULL; + ch->map_id_index->erase(bucket_id); + return CONHASH_OK; +} + +enum CONHASH_ERRCODE conhash_renew_bucket(struct consistent_hash *ch, struct conhash_bucket *bucket) +{ + struct ch_bucket_inner* inner_bucket=NULL; + u_int32_t bucket_index; + map<u_int32_t, u_int32_t>::iterator iter; + + if((iter=ch->map_id_index->find(bucket->bucket_id)) == ch->map_id_index->end()) + { + assert(0);return CONHASH_BUCKET_NOT_FOUND; + } + bucket_index = iter->second; + assert(bucket_index < ch->bucket_array_size); + + inner_bucket = &ch->bucket_array[bucket_index]; + assert(inner_bucket->is_valid == 1); + inner_bucket->bucket.tag = bucket->tag; + + if(inner_bucket->bucket.point_num == bucket->point_num) + { + return CONHASH_OK; + } + else if(inner_bucket->bucket.point_num < bucket->point_num) + { + return conhash_add_points(ch, inner_bucket, bucket->point_num-inner_bucket->bucket.point_num); + } + else + { + return conhash_del_points(ch, inner_bucket, inner_bucket->bucket.point_num-bucket->point_num); + } +} + +enum CONHASH_ERRCODE conhash_lookup_bucket(struct consistent_hash *ch, const void* key, int len, struct conhash_bucket* result) +{ + int idx=0, bucket_index=0; + u_int64_t hash; + + if(ch->bucket_cnt == 0) + { + return CONHASH_NO_VALID_BUCKETS; + } + + hash = MurmurHash64A(key, len, 23068673); + idx = search_up_bound(hash, ch->point_array, ch->point_num, sizeof(struct ch_point), offsetof(struct ch_point, point_val)); + ch->point_array[idx].hit_cnt++; + bucket_index = ch->point_array[idx].bucket_index; + assert(ch->bucket_array[bucket_index].is_valid == 1); + ch->bucket_array[bucket_index].hit_cnt++; + memcpy(result, &(ch->bucket_array[bucket_index].bucket), sizeof(struct conhash_bucket)); + return CONHASH_OK; +} + +enum CONHASH_ERRCODE conhash_lookup_bucket_int(struct consistent_hash *ch, u_int64_t randint, struct conhash_bucket* result) +{ + int idx=0, bucket_index=0; + + if(ch->bucket_cnt == 0) + { + return CONHASH_NO_VALID_BUCKETS; + } + + idx = search_up_bound(randint, ch->point_array, ch->point_num, sizeof(struct ch_point), offsetof(struct ch_point, point_val)); + ch->point_array[idx].hit_cnt++; + bucket_index = ch->point_array[idx].bucket_index; + assert(ch->bucket_array[bucket_index].is_valid == 1); + ch->bucket_array[bucket_index].hit_cnt++; + memcpy(result, &(ch->bucket_array[bucket_index].bucket), sizeof(struct conhash_bucket)); + return CONHASH_OK; +} + +void conhash_dump_detail(struct consistent_hash *ch) +{ + /*for(u_int32_t i=0; i<ch->point_num; i++) + { + printf("bucket_id: %10u, bucket_index: %5u, point_val:%lx, hit_cnt: %lu\n", ch->point_array[i].bucket_id, + ch->point_array[i].bucket_index, ch->point_array[i].point_val, ch->point_array[i].hit_cnt); + } + + printf("\n\n\n\n");*/ + for(u_int32_t i=0; i<ch->bucket_cnt; i++) + { + if(ch->bucket_array[i].is_valid) + { + printf("bucket_id: %10u, bucket_index: %5u, hit_cnt: %lu\n", ch->bucket_array[i].bucket.bucket_id, + i, ch->bucket_array[i].hit_cnt); + } + } +} + |
