diff options
| author | [email protected] <[email protected]> | 2021-11-02 12:34:05 +0800 |
|---|---|---|
| committer | [email protected] <[email protected]> | 2021-11-02 12:34:05 +0800 |
| commit | 31f55f0b88d4af34a8a36497f5e49c69b88b2fbf (patch) | |
| tree | 63515b3ceb361369cdc88ae6db1a808fc80e5b42 /src | |
Diffstat (limited to 'src')
| -rw-r--r-- | src/Makefile | 22 | ||||
| -rw-r--r-- | src/nirvana_conhash.cpp | 483 | ||||
| -rw-r--r-- | src/nirvana_conhash.h | 50 | ||||
| -rw-r--r-- | src/nirvana_murmurhash.cpp | 99 | ||||
| -rw-r--r-- | src/nirvana_murmurhash.h | 16 | ||||
| -rw-r--r-- | src/wy_singleflow_broadcast.cpp | 126 | ||||
| -rw-r--r-- | src/wy_singleflow_broadcast.h | 30 | ||||
| -rw-r--r-- | src/wy_singleflow_entry.cpp | 324 | ||||
| -rw-r--r-- | src/wy_singleflow_entry.h | 88 | ||||
| -rw-r--r-- | src/wy_singleflow_keepalive.cpp | 237 | ||||
| -rw-r--r-- | src/wy_singleflow_keepalive.h | 53 |
11 files changed, 1528 insertions, 0 deletions
diff --git a/src/Makefile b/src/Makefile new file mode 100644 index 0000000..801e452 --- /dev/null +++ b/src/Makefile @@ -0,0 +1,22 @@ +CC=g++ +CCC=g++ + +CFLAGS = -Wall -g -fPIC + +INC_PATH = -I../include +LIBS = -lMESA_prof_load -lMESA_handle_logger -lMESA_field_stat2 -lMESA_htable + +OBJS = nirvana_conhash.o nirvana_murmurhash.o wy_singleflow_entry.o wy_singleflow_keepalive.o + +TARGET=wy_singleflow.so + +$(TARGET):$(OBJS) + $(CCC) -fPIC -shared $^ -o $@ $(LIBS) +# cp $@ /home/zhangcw/sapp_jcq_dumpfile/plug/business/stream_packets_cache/ + +.cpp.o: + $(CC) -c $(CFLAGS) $(INC_PATH) $< + +clean: + rm -f $(TARGET) $(OBJS) + diff --git a/src/nirvana_conhash.cpp b/src/nirvana_conhash.cpp new file mode 100644 index 0000000..d687242 --- /dev/null +++ b/src/nirvana_conhash.cpp @@ -0,0 +1,483 @@ +#include <stdint.h> +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <assert.h> +#include <math.h> + +#include <map> + +#include "nirvana_murmurhash.h" +#include "nirvana_conhash.h" + +using namespace std; + +#ifndef offsetof +#define offsetof(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER) +#endif + +struct ch_point +{ + u_int32_t bucket_id; + u_int32_t bucket_index; /* which backend it belongs to, use IP address */ + u_int64_t hit_cnt; + u_int64_t point_val; /* hash code of this nodes, it is used to map node to consistent hash cycle */ +}; + +struct ch_bucket_inner +{ + struct conhash_bucket bucket; + + struct ch_point point_array[CONHASH_MAX_POINTS_PER_BUCKET]; //��ʹ��point_val��Աȥ�� + int32_t is_valid; + int32_t bucket_index; + u_int64_t hit_cnt; +}; + +struct consistent_hash +{ + struct ch_bucket_inner *bucket_array; + u_int32_t bucket_array_size; + u_int32_t bucket_cnt; + u_int32_t point_num; + struct ch_point *point_array; + map<u_int32_t, u_int32_t> *map_id_index; +}; + +//coefficient of variation of the RMSD +double conhash_calulate_CVRSMD(struct consistent_hash *ch) +{ + struct ch_bucket_inner* b=NULL; + u_int32_t i=0; + double sum_hit=0, sum_point=0; + double MSE=0,RMSD=0,CVRMSD=0; + for(i=0;i<ch->bucket_array_size;i++) + { + b=ch->bucket_array+i; + if(b->is_valid==0) + { + continue; + } + sum_hit+=(double)b->hit_cnt; + sum_point+=(double)b->bucket.point_num; + } + for(i=0;i<ch->bucket_array_size;i++) + { + b=ch->bucket_array+i; + if(b->is_valid==0) + { + continue; + } + MSE+=pow(b->hit_cnt-(b->bucket.point_num*sum_hit)/sum_point,2); + } + RMSD = sqrt(MSE/ch->bucket_cnt); + CVRMSD = RMSD/(sum_hit/ch->bucket_cnt); + return CVRMSD; +} + +static int qsort_cmp_by_key_increase(const void* a, const void* b) +{ + if(((const struct ch_point*)a)->point_val > ((const struct ch_point*)b)->point_val) + { + return 1; + } + else if(((const struct ch_point*)a)->point_val == ((const struct ch_point*)b)->point_val) + { + return 0; + } + else + { + return -1; + } +} + +static int qsort_cmp_by_key_decrease(const void* a, const void* b) +{ + if(((const struct ch_point*)a)->point_val > ((const struct ch_point*)b)->point_val) + { + return -1; + } + else if(((const struct ch_point*)a)->point_val == ((const struct ch_point*)b)->point_val) + { + return 0; + } + else + { + return 1; + } +} + +// (vector<int>& nums, int target) +static u_int32_t search_up_bound(u_int64_t target, const void *base, + int32_t nmemb, size_t size,int val_offset) +{ + int32_t low = 0, high = nmemb-1, mid; + + // Invariant: the desired index is between [low, high+1] + + while (low <= high) + { + mid = low + (high-low)/2; + if(*(u_int64_t*)((char*)base+size*mid+val_offset) < target) + { + low = mid+1; + } + else + { + high = mid-1; + } + } + if(low == nmemb) + { + low=0; + } + // (1) At this point, low > high. That is, low >= high+1 + // (2) From the invariant, we know that the index is between [low, high+1], so low <= high+1. Follwing from (1), now we know low == high+1. + // (3) Following from (2), the index is between [low, high+1] = [low, low], which means that low is the desired index + // Therefore, we return low as the answer. You can also return high+1 as the result, since low == high+1 + return low; +} + +//��֤��ͬ��bucket_id&&point_index������ͬ��point_id +static u_int64_t bucket_gen_uniq_point(struct ch_bucket_inner *inner_bucket, u_int32_t cur_point_index) +{ + u_int64_t x=0, seed; + u_int32_t hash, i=0; + + seed = (((u_int64_t)cur_point_index)<<32) | inner_bucket->bucket.bucket_id; + hash = murmurhash2(&seed, sizeof(u_int64_t), 515880193); + x = (((u_int64_t)hash)<<32) | inner_bucket->bucket.bucket_id; + + while(i != cur_point_index) + { + for(i=0; i<cur_point_index; i++) + { + if(x == inner_bucket->point_array[i].point_val) //��ͻ + { + seed = (((u_int64_t)hash)<<32) | inner_bucket->bucket.bucket_id; + hash = murmurhash2(&seed, sizeof(u_int64_t), 515880193); + x = (((u_int64_t)hash)<<32) | inner_bucket->bucket.bucket_id; + i = 0; + break; + } + } + } + inner_bucket->point_array[cur_point_index].point_val = x; + return x; +} + +u_int32_t conhash_get_bucket_num(struct consistent_hash *ch) +{ + return ch->bucket_cnt; +} + +struct consistent_hash *conhash_instance_new(const struct conhash_bucket *buckets, uint32_t bucket_num) +{ + struct consistent_hash *ch=NULL; + u_int32_t i, j, k; + u_int64_t randval; + + for(i=0; i<bucket_num; i++) + { + if(buckets[i].point_num > CONHASH_MAX_POINTS_PER_BUCKET) + { + return NULL; + } + } + ch = (struct consistent_hash *)calloc(1, sizeof(struct consistent_hash)); + + /*buckets*/ + ch->map_id_index = new map<u_int32_t, u_int32_t>; + ch->bucket_array = (struct ch_bucket_inner*)calloc(1, sizeof(struct ch_bucket_inner)*bucket_num); + for(i=0; i<bucket_num; i++) + { + memcpy(&(ch->bucket_array[i].bucket), &buckets[i], sizeof(struct conhash_bucket)); + ch->bucket_array[i].is_valid = 1; + ch->bucket_array[i].bucket_index = i; + ch->point_num += buckets[i].point_num; + ch->map_id_index->insert(make_pair(buckets[i].bucket_id, i)); + } + ch->bucket_cnt = bucket_num; + + /*global points*/ + ch->point_array = (struct ch_point*)calloc(1, sizeof(struct ch_point)*ch->point_num); + ch->bucket_array_size = bucket_num; + for(i=0, k=0; i<ch->bucket_array_size; i++) + { + for(j=0; j<ch->bucket_array[i].bucket.point_num; j++,k++) + { + randval = bucket_gen_uniq_point(&ch->bucket_array[i], j); + ch->point_array[k].bucket_id = ch->bucket_array[i].bucket.bucket_id; + ch->point_array[k].bucket_index = i; + ch->point_array[k].point_val = randval; + ch->point_array[k].hit_cnt = 0; + } + qsort(ch->bucket_array[i].point_array, ch->bucket_array[i].bucket.point_num, sizeof(struct ch_point), qsort_cmp_by_key_decrease); + } + qsort(ch->point_array, ch->point_num, sizeof(struct ch_point), qsort_cmp_by_key_increase); + return ch; +} + +void conhash_instance_free(struct consistent_hash *ch) +{ + free(ch->point_array); + free(ch->bucket_array); + delete ch->map_id_index; + free(ch); +} + +struct consistent_hash *conhash_instance_copy(struct consistent_hash *ch) +{ + struct consistent_hash *copy; + + copy = (struct consistent_hash *)calloc(1, sizeof(struct consistent_hash)); + copy->bucket_array_size = ch->bucket_array_size; + copy->bucket_cnt = ch->bucket_cnt; + copy->point_num = ch->point_num; + copy->bucket_array = (struct ch_bucket_inner*)calloc(sizeof(struct ch_bucket_inner), ch->bucket_array_size); + memcpy(copy->bucket_array, ch->bucket_array, sizeof(struct ch_bucket_inner)*ch->bucket_array_size); + + copy->point_array = (struct ch_point*)calloc(sizeof(struct ch_point), copy->point_num); + memcpy(copy->point_array, ch->point_array, sizeof(struct ch_point)*copy->point_num); + + copy->map_id_index = new map<u_int32_t, u_int32_t>; + copy->map_id_index->insert(ch->map_id_index->begin(), ch->map_id_index->end()); + return copy; +} + +static enum CONHASH_ERRCODE conhash_add_points(struct consistent_hash *ch, struct ch_bucket_inner *inner_bucket, int32_t add_points) +{ + u_int64_t randval; + + if(inner_bucket->bucket.point_num + add_points > CONHASH_MAX_POINTS_PER_BUCKET) + { + assert(0);return CONHASH_ERR_INVALID_ARGS; + } + + ch->point_array = (struct ch_point *)realloc(ch->point_array,sizeof(struct ch_point)*(ch->point_num+add_points)); + + for(u_int32_t i=ch->point_num; i<ch->point_num+add_points; i++) + { + randval = bucket_gen_uniq_point(inner_bucket, inner_bucket->bucket.point_num); + inner_bucket->bucket.point_num++; + ch->point_array[i].bucket_id = inner_bucket->bucket.bucket_id; + ch->point_array[i].bucket_index = inner_bucket->bucket_index; + ch->point_array[i].point_val = randval; + ch->point_array[i].hit_cnt = 0; + } + ch->point_num += add_points; + qsort(inner_bucket->point_array, inner_bucket->bucket.point_num, sizeof(struct ch_point), qsort_cmp_by_key_decrease); + qsort(ch->point_array, ch->point_num, sizeof(struct ch_point), qsort_cmp_by_key_increase); + return CONHASH_OK; +} + +static enum CONHASH_ERRCODE conhash_del_points(struct consistent_hash *ch, struct ch_bucket_inner *inner_bucket, u_int32_t del_points) +{ + struct ch_point *tmp_points; + u_int32_t i, j, removed; + + if(inner_bucket->bucket.point_num == 0) + { + return CONHASH_OK; + } + if(inner_bucket->bucket.point_num < del_points) + { + assert(0);return CONHASH_ERR_INVALID_ARGS; + } + + tmp_points = (struct ch_point*)malloc(sizeof(struct ch_point)*ch->point_num); + memcpy(tmp_points, ch->point_array, sizeof(struct ch_point)*ch->point_num); + + for(i=0,j=0,removed=0; i<ch->point_num; i++) + { + if(removed<del_points && tmp_points[i].bucket_id==inner_bucket->bucket.bucket_id) + { + assert(inner_bucket->point_array[inner_bucket->bucket.point_num-1].point_val == tmp_points[i].point_val); + inner_bucket->bucket.point_num--; + removed++; + continue; + } + memcpy(&ch->point_array[j], &tmp_points[i], sizeof(struct ch_point)); + j++; + } + assert(removed == del_points); + free(tmp_points); + ch->point_num -= del_points; + //Sort is unnecessary after deletion. + return CONHASH_OK; +} + +enum CONHASH_ERRCODE conhash_insert_bucket(struct consistent_hash *ch, const struct conhash_bucket *bucket) +{ + struct ch_bucket_inner *inner_bucket=NULL; + u_int32_t i, bucket_index; + map<u_int32_t, u_int32_t>::iterator iter; + enum CONHASH_ERRCODE code; + + if(bucket->point_num <= 0) + { + return CONHASH_ERR_INVALID_ARGS; + } + if((iter=ch->map_id_index->find(bucket->bucket_id)) != ch->map_id_index->end()) + { + return CONHASH_BUCKET_ALREADY_EXIST; + } + + if(ch->bucket_cnt < ch->bucket_array_size) + { + for(i=0; i<ch->bucket_array_size; i++) + { + if(ch->bucket_array[i].is_valid == 0) + { + bucket_index = i; + break; + } + } + assert(i < ch->bucket_array_size && ch->bucket_array[bucket_index].bucket.point_num==0); //һ�����ҵ� + + inner_bucket = &ch->bucket_array[bucket_index]; + } + else + { + assert(ch->bucket_array_size == ch->bucket_cnt); + bucket_index = ch->bucket_cnt; + ch->bucket_array_size = ch->bucket_cnt + 1; + ch->bucket_array = (struct ch_bucket_inner*)realloc(ch->bucket_array, sizeof(struct ch_bucket_inner)*ch->bucket_array_size); + memset(&ch->bucket_array[bucket_index], 0, sizeof(struct ch_bucket_inner)); + inner_bucket = &ch->bucket_array[bucket_index]; + } + inner_bucket->bucket.bucket_id = bucket->bucket_id; + inner_bucket->bucket.tag = bucket->tag; + inner_bucket->bucket_index = bucket_index; + + if(CONHASH_OK != (code=conhash_add_points(ch, inner_bucket, bucket->point_num))) + { + return code; + } + inner_bucket->is_valid = 1; + inner_bucket->bucket_index = bucket_index; + inner_bucket->hit_cnt = 0; + ch->bucket_cnt++; + ch->map_id_index->insert(make_pair(bucket->bucket_id, bucket_index)); + return CONHASH_OK; +} + +enum CONHASH_ERRCODE conhash_remove_bucket(struct consistent_hash *ch, u_int32_t bucket_id, void (*free_cb)(void *tag, u_int32_t point_num)) +{ + struct ch_bucket_inner* inner_bucket=NULL; + u_int32_t bucket_index; + map<u_int32_t, u_int32_t>::iterator iter; + enum CONHASH_ERRCODE code; + + if((iter=ch->map_id_index->find(bucket_id)) == ch->map_id_index->end()) + { + return CONHASH_BUCKET_NOT_FOUND; + } + bucket_index = iter->second; + assert(bucket_index < ch->bucket_array_size); + + inner_bucket = &ch->bucket_array[bucket_index]; + if(CONHASH_OK != (code=conhash_del_points(ch, inner_bucket, inner_bucket->bucket.point_num))) + { + return code; + } + ch->bucket_cnt--; + inner_bucket->is_valid = 0; + if(free_cb) + { + free_cb(inner_bucket->bucket.tag, inner_bucket->bucket.point_num); + } + inner_bucket->bucket.point_num = 0; + inner_bucket->bucket.tag = NULL; + ch->map_id_index->erase(bucket_id); + return CONHASH_OK; +} + +enum CONHASH_ERRCODE conhash_renew_bucket(struct consistent_hash *ch, struct conhash_bucket *bucket) +{ + struct ch_bucket_inner* inner_bucket=NULL; + u_int32_t bucket_index; + map<u_int32_t, u_int32_t>::iterator iter; + + if((iter=ch->map_id_index->find(bucket->bucket_id)) == ch->map_id_index->end()) + { + assert(0);return CONHASH_BUCKET_NOT_FOUND; + } + bucket_index = iter->second; + assert(bucket_index < ch->bucket_array_size); + + inner_bucket = &ch->bucket_array[bucket_index]; + assert(inner_bucket->is_valid == 1); + inner_bucket->bucket.tag = bucket->tag; + + if(inner_bucket->bucket.point_num == bucket->point_num) + { + return CONHASH_OK; + } + else if(inner_bucket->bucket.point_num < bucket->point_num) + { + return conhash_add_points(ch, inner_bucket, bucket->point_num-inner_bucket->bucket.point_num); + } + else + { + return conhash_del_points(ch, inner_bucket, inner_bucket->bucket.point_num-bucket->point_num); + } +} + +enum CONHASH_ERRCODE conhash_lookup_bucket(struct consistent_hash *ch, const void* key, int len, struct conhash_bucket* result) +{ + int idx=0, bucket_index=0; + u_int64_t hash; + + if(ch->bucket_cnt == 0) + { + return CONHASH_NO_VALID_BUCKETS; + } + + hash = MurmurHash64A(key, len, 515880193); + idx = search_up_bound(hash, ch->point_array, ch->point_num, sizeof(struct ch_point), offsetof(struct ch_point, point_val)); + ch->point_array[idx].hit_cnt++; + bucket_index = ch->point_array[idx].bucket_index; + assert(ch->bucket_array[bucket_index].is_valid == 1); + ch->bucket_array[bucket_index].hit_cnt++; + memcpy(result, &(ch->bucket_array[bucket_index].bucket), sizeof(struct conhash_bucket)); + return CONHASH_OK; +} + +enum CONHASH_ERRCODE conhash_lookup_bucket_int(struct consistent_hash *ch, u_int64_t randint, struct conhash_bucket* result) +{ + int idx=0, bucket_index=0; + + if(ch->bucket_cnt == 0) + { + return CONHASH_NO_VALID_BUCKETS; + } + + idx = search_up_bound(randint, ch->point_array, ch->point_num, sizeof(struct ch_point), offsetof(struct ch_point, point_val)); + ch->point_array[idx].hit_cnt++; + bucket_index = ch->point_array[idx].bucket_index; + assert(ch->bucket_array[bucket_index].is_valid == 1); + ch->bucket_array[bucket_index].hit_cnt++; + memcpy(result, &(ch->bucket_array[bucket_index].bucket), sizeof(struct conhash_bucket)); + return CONHASH_OK; +} + +void conhash_dump_detail(struct consistent_hash *ch) +{ + /*for(u_int32_t i=0; i<ch->point_num; i++) + { + printf("bucket_id: %10u, bucket_index: %5u, point_val:%lx, hit_cnt: %lu\n", ch->point_array[i].bucket_id, + ch->point_array[i].bucket_index, ch->point_array[i].point_val, ch->point_array[i].hit_cnt); + } + + printf("\n\n\n\n");*/ + for(u_int32_t i=0; i<ch->bucket_cnt; i++) + { + if(ch->bucket_array[i].is_valid) + { + printf("bucket_id: %10u, bucket_index: %5u, hit_cnt: %lu\n", ch->bucket_array[i].bucket.bucket_id, + i, ch->bucket_array[i].hit_cnt); + } + } +} + diff --git a/src/nirvana_conhash.h b/src/nirvana_conhash.h new file mode 100644 index 0000000..42320b1 --- /dev/null +++ b/src/nirvana_conhash.h @@ -0,0 +1,50 @@ +#ifndef __NVN_CONSISTENT_HASH_H__ +#define __NVN_CONSISTENT_HASH_H__ + +#include <stdint.h> + +#ifndef CONHASH_MAX_POINTS_PER_BUCKET +#define CONHASH_MAX_POINTS_PER_BUCKET 128 +#endif + +#ifdef __cplusplus +extern "C" +{ +#endif + +enum CONHASH_ERRCODE +{ + CONHASH_OK = 0, + CONHASH_ERR_INVALID_ARGS = -1, + CONHASH_BUCKET_NOT_FOUND = -2, + CONHASH_BUCKET_ALREADY_EXIST=-3, + CONHASH_NO_VALID_BUCKETS=-4, +}; + +struct conhash_bucket +{ + uint32_t bucket_id; + uint32_t point_num; /*should be not more than CONHASH_MAX_POINTS_PER_BUCKET*/ + void* tag; +}; +struct consistent_hash; + +/*API�̲߳���ȫ*/ +struct consistent_hash *conhash_instance_new(const struct conhash_bucket *buckets, uint32_t bucket_num); +void conhash_instance_free(struct consistent_hash *ch); +struct consistent_hash *conhash_instance_copy(struct consistent_hash *ch); + +enum CONHASH_ERRCODE conhash_insert_bucket(struct consistent_hash *ch,const struct conhash_bucket* bucket); +enum CONHASH_ERRCODE conhash_remove_bucket(struct consistent_hash *ch, u_int32_t bucket_id, void (*free_cb)(void *tag, u_int32_t point_num)); +enum CONHASH_ERRCODE conhash_renew_bucket(struct consistent_hash *ch, struct conhash_bucket* bucket); /*����point_num��tag*/ +enum CONHASH_ERRCODE conhash_lookup_bucket(struct consistent_hash *ch, const void* key, int len, struct conhash_bucket *result/*OUT*/); +enum CONHASH_ERRCODE conhash_lookup_bucket_int(struct consistent_hash *ch, u_int64_t randint, struct conhash_bucket* result); + +double conhash_calulate_CVRSMD(struct consistent_hash *p); +u_int32_t conhash_get_bucket_num(struct consistent_hash *ch); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/nirvana_murmurhash.cpp b/src/nirvana_murmurhash.cpp new file mode 100644 index 0000000..193bde9 --- /dev/null +++ b/src/nirvana_murmurhash.cpp @@ -0,0 +1,99 @@ +#include <stdint.h> +#include <stdlib.h> +#include <stdio.h> + +#include "nirvana_murmurhash.h" + +unsigned int murmurhash2(const void * key, int len, const unsigned int seed) +{ + // 'm' and 'r' are mixing constants generated offline. + // They're not really 'magic', they just happen to work well. + + const unsigned int m = 0x5bd1e995; + const int r = 24; + + // Initialize the hash to a 'random' value + + unsigned int h = seed ^ len; + + // Mix 4 bytes at a time into the hash + + const unsigned char * data = (const unsigned char *)key; + + while(len >= 4) + { + unsigned int k = *(unsigned int *)data; + + k *= m; + k ^= k >> r; + k *= m; + + h *= m; + h ^= k; + + data += 4; + len -= 4; + } + + // Handle the last few bytes of the input array + + switch(len) + { + case 3: h ^= data[2] << 16; + case 2: h ^= data[1] << 8; + case 1: h ^= data[0]; + h *= m; + default:break; + }; + + // Do a few final mixes of the hash to ensure the last few + // bytes are well-incorporated. + + h ^= h >> 13; + h *= m; + h ^= h >> 15; + return h; +} + +/*64-bit hash for 64-bit platforms*/ +u_int64_t MurmurHash64A(const void * key, int len, unsigned int seed) +{ + const uint64_t m = 0xc6a4a7935bd1e995; + const int r = 47; + uint64_t k, h = seed ^ (len * m); + const uint64_t * data = (const uint64_t *)key; + const uint64_t * end = data + (len/8); + + while (data != end) + { + k = *data++; + k *= m; + k ^= k >> r; + k *= m; + + h ^= k; + h *= m; + } + + const unsigned char * data2 = (const unsigned char*)data; + + switch (len & 7) + { + case 7: h ^= uint64_t(data2[6]) << 48; + case 6: h ^= uint64_t(data2[5]) << 40; + case 5: h ^= uint64_t(data2[4]) << 32; + case 4: h ^= uint64_t(data2[3]) << 24; + case 3: h ^= uint64_t(data2[2]) << 16; + case 2: h ^= uint64_t(data2[1]) << 8; + case 1: h ^= uint64_t(data2[0]); + h *= m; + }; + + h ^= h >> r; + h *= m; + h ^= h >> r; + + return h; +} + + diff --git a/src/nirvana_murmurhash.h b/src/nirvana_murmurhash.h new file mode 100644 index 0000000..1467943 --- /dev/null +++ b/src/nirvana_murmurhash.h @@ -0,0 +1,16 @@ +#ifndef __NVN_MURMURHASH_H__ +#define __NVN_MURMURHASH_H__ + +#ifdef __cplusplus +extern "C" { +#endif + +unsigned int murmurhash2(const void * key, int len, const unsigned int seed); +u_int64_t MurmurHash64A(const void * key, int len, unsigned int seed); + +#ifdef __cplusplus +} +#endif + +#endif + diff --git a/src/wy_singleflow_broadcast.cpp b/src/wy_singleflow_broadcast.cpp new file mode 100644 index 0000000..3cc86c4 --- /dev/null +++ b/src/wy_singleflow_broadcast.cpp @@ -0,0 +1,126 @@ +#include <sys/stat.h> +#include <sys/types.h> +#include <unistd.h> +#include <stdio.h> +#include <stdlib.h> +#include <assert.h> +#include <errno.h> +#include <pthread.h> +#include <string.h> +#include <sys/ioctl.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <arpa/inet.h> +#include <sys/prctl.h> + +#include "wy_singleflow_entry.h" + + +extern struct wysf_global_info g_wysf_global_info; + +static int udp_broadcast_server_socket_init(void *log_runtime, u_int16_t port) +{ + struct sockaddr_in servaddr; + int on = 1, socket_fd; + + if((socket_fd = socket(AF_INET,SOCK_DGRAM,0))<0)//AF_INET:IPV4 AF_INET6:IPV6 + { + return -1; + } + if(setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))<0) + { + MESA_RUNTIME_LOGV3(log_runtime, RLOG_LV_FATAL, "setsockopt for udp_port %u failed, because:%s!!", port, strerror(errno)); + } + on = 5242880; + if(setsockopt(socket_fd, SOL_SOCKET, SO_RCVBUF, &on, sizeof(on))<0) + { + MESA_RUNTIME_LOGV3(log_runtime, RLOG_LV_FATAL, "setsockopt for udp_port %u failed, because:%s!!", port, strerror(errno)); + } + + bzero(&servaddr,sizeof(servaddr)); + servaddr.sin_family = AF_INET; + servaddr.sin_addr.s_addr = htonl(INADDR_ANY); + servaddr.sin_port = htons(port); + + if(bind(socket_fd,(struct sockaddr *)&servaddr,sizeof(servaddr))<0) + { + MESA_RUNTIME_LOGV3(log_runtime, RLOG_LV_FATAL, "bind udp socket port %u failed, because: %s!!", port, strerror(errno)); + close(socket_fd); + return -2; + } + evutil_make_socket_nonblocking(socket_fd); + return socket_fd; +} + +static long broadcast_stream_tuple_htable_cb(void *data, const uchar *key, uint size, void *user_arg) +{ + int32_t clj_ip=*(int32_t *)user_arg; + std::map<int32_t, struct judian_as_group*>::iterator iter; + struct judian_as_group *group; + + if((iter = g_wysf_global_info.forwardip2judian->find(clj_ip)) == g_wysf_global_info.forwardip2judian->end()) + { + char ipbuffer[32]; + inet_ntop(AF_INET, &clj_ip, ipbuffer, 32); + MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_FATAL, "lookup group for clj_ip: %s not found!!", ipbuffer); + return -1; + } + group = iter->second; + + if(data != NULL) + { + MESA_htable_del(g_wysf_global_info.stream_tuple_mapping, key, size, NULL); + } + MESA_htable_add(g_wysf_global_info.stream_tuple_mapping, key, size, group); + return 0; +} + +void udp_server_read_broadcast_cb(evutil_socket_t fd, short events, void *arg) +{ + struct udp_broadcast_msg *broadmsg; + char buffer[4096]; + int from_len = 0, buf_len, clj_ip; + struct sockaddr_in saddr; + long cb_ret; + + from_len = sizeof (struct sockaddr); + while((buf_len = recvfrom (fd, buffer, 4096, 0, (struct sockaddr*)&saddr, (socklen_t *)&from_len))>0) + { + broadmsg = (struct udp_broadcast_msg *)buffer; + if((u_int32_t)buf_len!=sizeof(struct udp_broadcast_msg) || broadmsg->header.magic_number != WYSF_MAGIC_NUMBER || + broadmsg->header.total_len!=sizeof(struct broadcast_message)) + { + continue; + } + + clj_ip = broadmsg->msg.clj_ip; + broadmsg->msg.clj_ip = 0; + MESA_htable_search_cb(g_wysf_global_info.stream_tuple_mapping, (uchar*)&broadmsg->msg, sizeof(struct broadcast_message), + broadcast_stream_tuple_htable_cb, &clj_ip, &cb_ret); + + from_len = sizeof (struct sockaddr); + } +} + +void *thread_fwdnode_breoadcast(void *arg) +{ + struct event_base *evbase; + struct event udp_server_event; + + evbase = event_base_new(); + + if((g_wysf_global_info.broadcast_udp_servert_sockfd = udp_broadcast_server_socket_init(g_wysf_global_info.log_runtime, g_wysf_global_info.broadcast_udp_server_port)) < 0) + { + assert(0);return NULL; + } + event_assign(&udp_server_event, evbase, g_wysf_global_info.broadcast_udp_servert_sockfd, + EV_READ|EV_PERSIST, udp_server_read_broadcast_cb, NULL); + event_add(&udp_server_event, NULL); + + event_base_dispatch(evbase); + printf("Libevent dispath error, should not run here.\n"); + MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_FATAL, "Libevent dispath error, should not run here."); + assert(0);return NULL; +} + diff --git a/src/wy_singleflow_broadcast.h b/src/wy_singleflow_broadcast.h new file mode 100644 index 0000000..cbdb8a4 --- /dev/null +++ b/src/wy_singleflow_broadcast.h @@ -0,0 +1,30 @@ +#ifndef __WYSF_BROADCAST_H__ +#define __WYSF_BROADCAST_H__ + +struct udp_message_header +{ + u_int16_t magic_number; + u_int8_t version; + u_int8_t struct_type; + u_int32_t total_len; +}; + +struct broadcast_message +{ + int32_t src_ip; + int32_t dst_ip; + int16_t src_port; + int16_t dst_port; + int32_t clj_ip; +}; + +struct udp_broadcast_msg +{ + struct udp_message_header header; + struct broadcast_message msg; +}; + +void *thread_fwdnode_breoadcast(void *arg); + +#endif + diff --git a/src/wy_singleflow_entry.cpp b/src/wy_singleflow_entry.cpp new file mode 100644 index 0000000..a464d8f --- /dev/null +++ b/src/wy_singleflow_entry.cpp @@ -0,0 +1,324 @@ +#include <assert.h> +#include <string.h> +#include <unistd.h> +#include <ctype.h> +#include <stdlib.h> +#include <arpa/inet.h> +#include <netinet/in.h> +#include <netinet/ip6.h> +#include <sys/time.h> +#include <errno.h> + +#include <MESA/MESA_prof_load.h> +#include <MESA/stream.h> + +#include "wy_singleflow_entry.h" + +struct wysf_global_info g_wysf_global_info; + + +char WYSF_TCP_STREAM_PKT_ENTRY(struct streaminfo *a_stream, void **pme, int thread_seq, void *a_packet) +{ + struct wysf_stream_context *context=(struct wysf_stream_context *)*pme; + + if(a_stream->pktstate == OP_STATE_PENDING) + { + context = *pme = (struct wysf_stream_context *)dictator_malloc(thread_seq, sizeof(struct wysf_stream_context)); + context->func_node = NULL; + } + + + return APP_STATE_GIVEME; +} + + +static int wysf_register_field_stat(struct wysf_global_info *info) +{ + const char *field_names[WYSF_FSSTAT_FIELD_MAX]={"FlowsTotal", "FlowsIntcpt", "FlowsRtnBack", "FlowsIntBytes", + "BroadCstSent", "BroadCstRecv"}; + const char *status_names[WYSF_FSSTAT_STATUS_MAX]={"FlowTable", "ActiveFuncs", "KeepAliveSW"}; + int value; + + info->fsstat_handle = FS_create_handle(); + FS_set_para(info->fsstat_handle, OUTPUT_DEVICE, info->fsstat_filepath, strlen(info->fsstat_filepath)+1); + if(info->fsstat_print_mode == 1) + { + FS_set_para(info->fsstat_handle, PRINT_MODE, &info->fsstat_print_mode, sizeof(info->fsstat_print_mode)); + } + else + { + FS_set_para(info->fsstat_handle, PRINT_MODE, &info->fsstat_print_mode, sizeof(info->fsstat_print_mode)); + value = 1; + FS_set_para(info->fsstat_handle, FLUSH_BY_DATE, &value, sizeof(value)); + } + value = info->fsstat_period; + FS_set_para(info->fsstat_handle, STAT_CYCLE, &value, sizeof(value)); + value = 0; + FS_set_para(info->fsstat_handle, CREATE_THREAD, &value, sizeof(value)); + FS_set_para(info->fsstat_handle, APP_NAME, info->fsstat_appname, strlen(info->fsstat_appname)+1); + FS_set_para(info->fsstat_handle, STATS_SERVER_IP, info->fsstat_dst_ip, strlen(info->fsstat_dst_ip)+1); + FS_set_para(info->fsstat_handle, STATS_SERVER_PORT, &info->fsstat_dst_port, sizeof(info->fsstat_dst_port)); + + for(int i=0; i<WYSF_FSSTAT_FIELD_MAX; i++) + { + info->fsstat_field_ids[i] = FS_register(info->fsstat_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, field_names[i]); + } + for(int i=0; i<WYSF_FSSTAT_STATUS_MAX; i++) + { + info->fsstat_status_ids[i] = FS_register(info->fsstat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT, status_names[i]); + } + FS_start(info->fsstat_handle); + return 0; +} + +void flow_tuple_htable_data_expires(void *data) +{ +} + +MESA_htable_handle init_and_create_htable(unsigned int slot_size, int expire_time, int lock_num, + void (*data_free)(void *data), + int (*data_expire_with_condition)(void *data, int eliminate_type)) +{ + MESA_htable_create_args_t htable_args; + + memset(&htable_args, 0, sizeof(MESA_htable_create_args_t)); + + htable_args.thread_safe = lock_num; + htable_args.recursive = 1; + htable_args.hash_slot_size = slot_size; + htable_args.max_elem_num = 10*slot_size; + htable_args.eliminate_type = HASH_ELIMINATE_ALGO_FIFO; + htable_args.expire_time = expire_time; + htable_args.data_free = data_free; //�ͷŽڵ��ڴ� + htable_args.data_expire_with_condition = data_expire_with_condition; + + return MESA_htable_create(&htable_args, sizeof(MESA_htable_create_args_t)); +} + +static int udp_broadcast_client_socket_init(void *log_runtime) +{ + int on = 1, socket_fd; + + if((socket_fd = socket(AF_INET,SOCK_DGRAM,0))<0) + { + MESA_RUNTIME_LOGV3(log_runtime, RLOG_LV_FATAL, "create udp socket error: %s", strerror(errno)); + return -1; + } + on = 5242880; + setsockopt(socket_fd, SOL_SOCKET, SO_SNDBUF, &on, sizeof(on)); + return socket_fd; +} + +static int _unfold_IP_range(char* ip_range, char***ip_list, int size) +{ + int i=0,count=0, ret=0; + int range_digits[5]; + memset(range_digits,0,sizeof(range_digits)); + ret=sscanf(ip_range,"%d.%d.%d.%d-%d",&range_digits[0],&range_digits[1],&range_digits[2],&range_digits[3],&range_digits[4]); + if(ret!=4&&ret!=5) + { + return 0; + } + if(ret==4&&range_digits[4]==0) + { + range_digits[4]=range_digits[3]; + } + for(i=0;i<5;i++) + { + if(range_digits[i]<0||range_digits[i]>255) + { + return 0; + } + } + count=range_digits[4]-range_digits[3]+1; + *ip_list=(char**)realloc(*ip_list, sizeof(char*)*(size+count)); + for(i=0;i<count;i++) + { + (*ip_list)[size+i]=(char*)malloc(64); + snprintf((*ip_list)[size+i],64,"%d.%d.%d.%d",range_digits[0],range_digits[1],range_digits[2],range_digits[3]+i); + } + return count; +} + +static int unfold_IP_range(const char* ip_range, char***ip_list) +{ + char *token=NULL,*sub_token=NULL,*saveptr; + char *buffer=(char*)calloc(sizeof(char),strlen(ip_range)+1); + int count=0; + strcpy(buffer,ip_range); + for (token = buffer; ; token= NULL) + { + sub_token= strtok_r(token,";", &saveptr); + if (sub_token == NULL) + break; + count+=_unfold_IP_range(sub_token, ip_list,count); + } + free(buffer); + return count; +} + +struct judian_as_group *init_function_group_nodes(const char *judian_name, char **ip_list, int ipnum, struct wysf_global_info *info) +{ + struct judian_as_group *group; + + group = (struct judian_as_group *)calloc(1, sizeof(struct judian_as_group)); + group->conhash = conhash_instance_new(NULL, 0); + snprintf(group->groupname, 64, "%s", judian_name); + + group->func_nodes = (struct function_node *)calloc(1, sizeof(struct function_node)*ipnum); + group->func_nodes_num = ipnum; + for(int i=0; i<ipnum; i++) + { + inet_pton(AF_INET, ip_list[i], &group->func_nodes[i].ip_as_bucketid); + group->func_nodes[i].parent = group; + group->func_nodes[i].sinaddr.sin_family = AF_INET; + group->func_nodes[i].sinaddr.sin_addr.s_addr = group->func_nodes[i].ip_as_bucketid; + group->func_nodes[i].sinaddr.sin_port = htons(info->bfd_dest_port); + group->func_nodes[i].dst_ip_str = ip_list[i]; + + info->func_keepalive->insert(std::make_pair(group->func_nodes[i].ip_as_bucketid, &group->func_nodes[i])); + } + + assert(info->func_group_num < 1024); + info->judian_group_list[info->func_group_num++] = group; + info->group_judian->insert(std::make_pair(std::string(judian_name), group)); + return group; +} + +int init_forward_group_nodes(struct judian_as_group *group, const char *judian_name, + char **ip_list, int ipnum, struct wysf_global_info *info) +{ + group->fwd_nodes = (struct forward_nodes *)calloc(1, sizeof(struct forward_nodes)*ipnum); + group->fwd_nodes_num = ipnum; + for(int i=0; i<ipnum; i++) + { + inet_pton(AF_INET, ip_list[i], &group->fwd_nodes[i].dst_ip); + group->fwd_nodes[i].parent = group; + group->fwd_nodes[i].sinaddr.sin_family = AF_INET; + group->fwd_nodes[i].sinaddr.sin_addr.s_addr = group->fwd_nodes[i].dst_ip; + group->fwd_nodes[i].sinaddr.sin_port = htons(info->bfd_dest_port); + group->fwd_nodes[i].dst_ip_str = ip_list[i]; + if((group->fwd_nodes[i].udp_sockfd = udp_broadcast_client_socket_init(info->log_runtime)) < 0) + { + return -1; + } + + info->forwardip2judian->insert(std::make_pair(group->fwd_nodes[i].dst_ip, group)); + } + return 0; +} + +int load_config_judian_function_iplist(const char *config_file, struct wysf_global_info *info) +{ + char namebuffer[4096], ipbuffer[1024], *judian_name, *save=NULL, **node_iplist; + struct judian_as_group *group; + int ipnum; + + if(0>=MESA_load_profile_string_nodef(config_file, "MODULE", "judian_name_list", namebuffer, sizeof(namebuffer))) + { + MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [MODULE]judian_name_list not found!", config_file); + assert(0);return -1; + } + + for(judian_name=strtok_r(namebuffer, ";", &save); judian_name!=NULL; judian_name=strtok_r(NULL, ";", &save)) + { + if(0>=MESA_load_profile_string_nodef(config_file, judian_name, "judian_func_ip_list", ipbuffer, sizeof(ipbuffer))) + { + MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]judian_func_ip_list not found!", config_file, judian_name); + assert(0);return -1; + } + if((ipnum = unfold_IP_range(ipbuffer, &node_iplist)) == 0) + { + MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]judian_func_ip_list is empty!", config_file, judian_name); + continue; + } + group = init_function_group_nodes(judian_name, node_iplist, ipnum, info); + free(node_iplist); + + if(0>=MESA_load_profile_string_nodef(config_file, judian_name, "judian_forward_ip_list", ipbuffer, sizeof(ipbuffer))) + { + MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]judian_forward_ip_list not found!", config_file, judian_name); + assert(0);return -1; + } + if((ipnum=unfold_IP_range(ipbuffer, &node_iplist)) == 0) + { + MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]judian_forward_ip_list is empty!", config_file, judian_name); + continue; + } + init_forward_group_nodes(group, judian_name, node_iplist, ipnum, info); + free(node_iplist); + } + return 0; +} + +int WYSF_STREAM_PKT_INIT(void) +{ + char root_log_name[256]; + pthread_t thread_desc; + pthread_attr_t attr; + int log_level; + SAPP_TLV_T tlv_value; + + memset(&g_wysf_global_info, 0, sizeof(struct wysf_global_info)); + g_wysf_global_info.group_judian = new std::map<std::string, struct judian_as_group*>; + g_wysf_global_info.func_keepalive = new std::map<int32_t, struct function_node*>; + g_wysf_global_info.forwardip2judian = new std::map<int32_t, struct judian_as_group*>; + + MESA_load_profile_string_def(WYSF_CONFIG_FILE, "MODULE", "RUN_LOG_NAME", root_log_name, sizeof(root_log_name), "./log/wysf_runtime.log"); + MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "RUN_LOG_LV", &log_level, 10); + g_wysf_global_info.log_runtime = MESA_create_runtime_log_handle(root_log_name, log_level); + if(NULL == g_wysf_global_info.log_runtime) + { + printf("MESA_create_runtime_log_handle %s failed: %s\n", root_log_name, strerror(errno)); + return -1; + } + if(load_config_judian_function_iplist(WYSF_CONFIG_FILE, &g_wysf_global_info)) + { + return -2; + } + + MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "broadcast_receive_port", &g_wysf_global_info.broadcast_udp_server_port, 6789); + + MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "bfd_dest_port", &g_wysf_global_info.bfd_dest_port, 8193); + MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "bfd_timeout_ms", &g_wysf_global_info.bfd_timeout_ms, 20); + MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "bfd_timeout_times", &g_wysf_global_info.bfd_timeout_times, 3); + MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "htable_flow_slots", &g_wysf_global_info.flowhtable_slots, 16777216); + MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "htable_flow_expires", &g_wysf_global_info.flowhtable_expires, 120); + MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "htable_locks_num", &g_wysf_global_info.htable_locknum, 67); + + MESA_load_profile_string_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_APPNAME", g_wysf_global_info.fsstat_appname, 16, "WYSF"); + MESA_load_profile_string_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_FILEPATH", g_wysf_global_info.fsstat_filepath, 256, "./log/wysf.fs"); + MESA_load_profile_uint_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_INTERVAL", &g_wysf_global_info.fsstat_period, 5); + MESA_load_profile_uint_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_PRINT_MODE", &g_wysf_global_info.fsstat_print_mode, 1); + MESA_load_profile_string_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_DST_IP", g_wysf_global_info.fsstat_dst_ip, 64, "127.0.0.1"); + MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_DST_PORT", &g_wysf_global_info.fsstat_dst_port, 8125); + wysf_register_field_stat(&g_wysf_global_info); + + g_wysf_global_info.stream_tuple_mapping = init_and_create_htable(g_wysf_global_info.flowhtable_slots, + g_wysf_global_info.flowhtable_expires, g_wysf_global_info.htable_locknum, NULL, NULL); + + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + if(pthread_create(&thread_desc, &attr, thread_funcnode_keepalive, NULL)) + { + MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_FATAL, "pthread_create(): %s", strerror(errno)); + assert(0);return -5; + } + if(pthread_create(&thread_desc, &attr, thread_fwdnode_breoadcast, NULL)) + { + MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_FATAL, "pthread_create(): %s", strerror(errno)); + assert(0);return -5; + } + + memset(&tlv_value, 0, sizeof(SAPP_TLV_T)); + tlv_value.type = GDEV_KEEPALIVE_OPT_GLOBAL_SWITCH; + tlv_value.int_value = 0; + tlv_value.length = sizeof(int); + gdev_keepalive_set_opt(&tlv_value); + return 0; +} + +void WYSF_STREAM_PKT_DESTROY(void) +{ +} + diff --git a/src/wy_singleflow_entry.h b/src/wy_singleflow_entry.h new file mode 100644 index 0000000..47c5961 --- /dev/null +++ b/src/wy_singleflow_entry.h @@ -0,0 +1,88 @@ +#ifndef __WYSF_PKT_ENTRY_H__ +#define __WYSF_PKT_ENTRY_H__ + +#include <MESA/MESA_htable.h> +#include <MESA/field_stat2.h> +#include <MESA/MESA_handle_logger.h> + +#include <map> +#include <string> + +#include <MESA/stream_inc/gdev_keepalive.h> +#include "wy_singleflow_keepalive.h" +#include "wy_singleflow_broadcast.h" + +#define WYSF_CONFIG_FILE "./conf/wy_single_flow.conf" + +#ifndef __FILENAME__ +#define __FILENAME__ __FILE__ +#endif +#define MESA_RUNTIME_LOGV3(handle, lv, fmt, args...) \ + MESA_handle_runtime_log((handle), (lv), "WYSF", "%s:%d, " fmt, __FILENAME__, __LINE__, ##args) + +#define WYSF_MAGIC_NUMBER 0x6E7A + +enum WYSF_FSSTAT_STATUS +{ + WYSF_FSSTAT_STATUS_FLOW_TABLE=0, + WYSF_FSSTAT_STATUS_ACTIVE_FUNC, + WYSF_FSSTAT_STATUS_KEEPALIVE, + + WYSF_FSSTAT_STATUS_MAX, +}; + +enum WYSF_FSSTAT_FIELD +{ + WYSF_FSSTAT_FIELD_FLOWS_TOTAL=0, + WYSF_FSSTAT_FIELD_FLOWS_INTERCEPT, + WYSF_FSSTAT_FIELD_FLOWS_BACK, + WYSF_FSSTAT_FIELD_INTER_BYTES, + WYSF_FSSTAT_FIELD_SEND_BROADCAST, + WYSF_FSSTAT_FIELD_RECV_BROADCAST, + + WYSF_FSSTAT_FIELD_MAX, +}; + +struct wysf_stream_context +{ + struct function_node *func_node; +}; + +struct wysf_global_info +{ + void *log_runtime; + MESA_htable_handle stream_tuple_mapping; + int32_t flowhtable_slots; + int32_t flowhtable_expires; + int32_t htable_locknum; + int32_t bfd_dest_port; + int32_t bfd_timeout_ms; + int32_t bfd_timeout_times; + struct event_base *alive_evbase; + + int32_t alive_func_nodes; + int32_t func_group_num; + struct judian_as_group *judian_group_list[1024]; + int32_t broadcast_udp_servert_sockfd; + int32_t broadcast_udp_server_port; + + std::map<std::string, struct judian_as_group*> *group_judian; + std::map<int32_t, struct function_node*> *func_keepalive; + std::map<int32_t, struct judian_as_group*> *forwardip2judian; + + screen_stat_handle_t fsstat_handle; + struct event fs_timer_output; + char fsstat_dst_ip[64]; + char fsstat_appname[16]; + char fsstat_filepath[256]; + char fsstat_histlen[256]; + u_int32_t fsstat_period; + u_int32_t fsstat_print_mode; //0-close; 1-automaticlly; 2-passively + int32_t fsstat_dst_port; + int32_t fsstat_histlen_id; + int32_t fsstat_field_ids[WYSF_FSSTAT_FIELD_MAX]; + int32_t fsstat_status_ids[WYSF_FSSTAT_STATUS_MAX]; +}; + +#endif + diff --git a/src/wy_singleflow_keepalive.cpp b/src/wy_singleflow_keepalive.cpp new file mode 100644 index 0000000..2c48705 --- /dev/null +++ b/src/wy_singleflow_keepalive.cpp @@ -0,0 +1,237 @@ +#include <sys/stat.h> +#include <sys/types.h> +#include <unistd.h> +#include <stdio.h> +#include <stdlib.h> +#include <assert.h> +#include <errno.h> +#include <pthread.h> +#include <string.h> +#include <sys/ioctl.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <arpa/inet.h> +#include <sys/prctl.h> + +#include "wy_singleflow_entry.h" +#include "wy_singleflow_keepalive.h" + +#define DEFAULT_HOST_CAPACITY 4 +#define LOAD_BALANC_VIRT_TIMES 16 + +extern struct wysf_global_info g_wysf_global_info; + + +static void conhash_delay_destroy_timer_cb(int fd, short kind, void *userp) +{ + struct time_event *delay_event=(struct time_event *)userp; + + conhash_instance_free(delay_event->conhash); + free(delay_event); +} + +static void load_balance_common_timer_start(struct event *time_event) +{ + struct timeval tv; + + tv.tv_sec = 2; + tv.tv_usec = 0; + evtimer_add(time_event, &tv); +} + +static void conhash_handle_delay_destroy(struct event_base *evbase, struct consistent_hash *conhash) +{ + struct time_event *delay_event; + + delay_event = (struct time_event *)malloc(sizeof(struct time_event)); + delay_event->conhash = conhash; + evtimer_assign(&delay_event->timer_event, evbase, conhash_delay_destroy_timer_cb, delay_event); + load_balance_common_timer_start(&delay_event->timer_event); +} + +static void conhash_insert_dest_host(struct judian_as_group *func_group, int32_t bucketid) +{ + struct conhash_bucket bucket; + struct consistent_hash *tmphash, *newhash=NULL; + enum CONHASH_ERRCODE code; + + bucket.bucket_id = bucketid; + bucket.point_num = DEFAULT_HOST_CAPACITY * LOAD_BALANC_VIRT_TIMES;; + bucket.tag = NULL; + + newhash = conhash_instance_copy(func_group->conhash); + code = conhash_insert_bucket(newhash, &bucket); + assert(code == CONHASH_OK); + + tmphash = func_group->conhash; + func_group->conhash = newhash; + conhash_handle_delay_destroy(g_wysf_global_info.alive_evbase , tmphash); +} + +static void conhash_remove_dest_host(struct judian_as_group *func_group, int32_t bucketid) +{ + struct consistent_hash *tmphash, *newhash=NULL; + enum CONHASH_ERRCODE code; + + newhash = conhash_instance_copy(func_group->conhash); + code = conhash_remove_bucket(newhash, bucketid, NULL); + assert(code == CONHASH_OK || code==CONHASH_BUCKET_NOT_FOUND); + + tmphash = func_group->conhash; + func_group->conhash = newhash; + conhash_handle_delay_destroy(g_wysf_global_info.alive_evbase, tmphash); +} + +static int udp_keepalive_client_socket_init(void) +{ + int on = 1, socket_fd; + + if((socket_fd = socket(AF_INET,SOCK_DGRAM,0))<0) + { + return -1; + } + on = 5242880; + setsockopt(socket_fd, SOL_SOCKET, SO_SNDBUF, &on, sizeof(on)); + evutil_make_socket_nonblocking(socket_fd); + return socket_fd; +} + +void func_client_read_bfdres_cb(evutil_socket_t fd, short events, void *arg) +{ + struct function_node *nodes=(struct function_node *)arg; + struct sockaddr_in saddr; + char buffer[2048]; + int from_len, buf_len; + SAPP_TLV_T tlv_value; + + from_len = sizeof (struct sockaddr); + while((buf_len = recvfrom (fd, buffer, 65536, 0, (struct sockaddr*)&saddr, (socklen_t *)&from_len))>0) + { + //BFD packet validate + nodes->retry_times = 0; + if(!nodes->conhash_inserted) + { + conhash_insert_dest_host(nodes->parent, nodes->ip_as_bucketid); + nodes->conhash_inserted = 1; + if(g_wysf_global_info.alive_func_nodes == 0) + { + memset(&tlv_value, 0, sizeof(SAPP_TLV_T)); + tlv_value.type = GDEV_KEEPALIVE_OPT_GLOBAL_SWITCH; + tlv_value.int_value = 1; + tlv_value.length = sizeof(int); + gdev_keepalive_set_opt(&tlv_value); + } + g_wysf_global_info.alive_func_nodes += 1; + MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_DEBUG, "keepalive ip %s inserted", nodes->dst_ip_str); + } + from_len = sizeof (struct sockaddr); + } +} + +void func_client_send_bfd_packet(struct function_node *nodes, const char *data, int datalen, struct sockaddr *dstaddr) +{ + nodes->retry_times++; + + if(datalen != sendto(nodes->udp_sockfd, data, datalen, 0, (struct sockaddr*)&nodes->sinaddr, sizeof(struct sockaddr))) + { + MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_DEBUG, "sendto ip %s failed: %s", nodes->dst_ip_str, strerror(errno)); + } +} + +static void func_client_alive_timer_cb(int fd, short kind, void *userp) +{ + struct function_node *nodes=(struct function_node *)userp; + struct timeval tv; + SAPP_TLV_T tlv_value; + + func_client_send_bfd_packet(nodes, NULL, 0, (struct sockaddr *)&nodes->sinaddr); //todo + + if(nodes->conhash_inserted && nodes->retry_times >= g_wysf_global_info.bfd_timeout_times) + { + if(--g_wysf_global_info.alive_func_nodes == 0) + { + memset(&tlv_value, 0, sizeof(SAPP_TLV_T)); + tlv_value.type = GDEV_KEEPALIVE_OPT_GLOBAL_SWITCH; + tlv_value.int_value = 0; + tlv_value.length = sizeof(int); + gdev_keepalive_set_opt(&tlv_value); + } + conhash_remove_dest_host(nodes->parent, nodes->ip_as_bucketid); + nodes->conhash_inserted = 0; + MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_DEBUG, "keepalive ip %s removed", nodes->dst_ip_str); + } + + tv.tv_sec = 0; + tv.tv_usec = g_wysf_global_info.bfd_timeout_ms * 1000; + evtimer_add(&nodes->alive_detect_timer, &tv); +} + +static void finger_fs_output_timer_cb(int fd, short kind, void *userp) +{ + struct timeval tv; + u_int32_t bucket_num=0; + SAPP_TLV_T tlv_value; + + for(int32_t i=0; i<g_wysf_global_info.func_group_num; i++) + { + bucket_num += conhash_get_bucket_num(g_wysf_global_info.judian_group_list[i]->conhash); + } + FS_operate(g_wysf_global_info.fsstat_handle, g_wysf_global_info.fsstat_status_ids[WYSF_FSSTAT_STATUS_ACTIVE_FUNC], 0, FS_OP_SET, bucket_num); + + bucket_num = MESA_htable_get_elem_num(g_wysf_global_info.stream_tuple_mapping); + FS_operate(g_wysf_global_info.fsstat_handle, g_wysf_global_info.fsstat_status_ids[WYSF_FSSTAT_STATUS_FLOW_TABLE], 0, FS_OP_SET, bucket_num); + + memset(&tlv_value, 0, sizeof(SAPP_TLV_T)); + tlv_value.type = GDEV_KEEPALIVE_OPT_GLOBAL_SWITCH; + tlv_value.length = sizeof(int); + gdev_keepalive_get_opt(&tlv_value); + FS_operate(g_wysf_global_info.fsstat_handle, g_wysf_global_info.fsstat_status_ids[WYSF_FSSTAT_STATUS_KEEPALIVE], 0, FS_OP_SET, tlv_value.int_value); + + FS_passive_output(g_wysf_global_info.fsstat_handle); + tv.tv_sec = g_wysf_global_info.fsstat_period; + tv.tv_usec = 0; + evtimer_add(&g_wysf_global_info.fs_timer_output, &tv); +} + +void *thread_funcnode_keepalive(void *arg) +{ + struct event_base *evbase; + struct judian_as_group *func_group; + struct timeval tv; + + prctl(PR_SET_NAME, "wysf_alive"); + + g_wysf_global_info.alive_evbase = evbase = event_base_new(); + + for(int i=0; i<g_wysf_global_info.func_group_num; i++) + { + func_group = g_wysf_global_info.judian_group_list[i]; + for(int j=0; j<func_group->func_nodes_num; j++) + { + func_group->func_nodes[i].udp_sockfd = udp_keepalive_client_socket_init(); + event_assign(&func_group->func_nodes[i].msgevent, evbase, func_group->func_nodes[i].udp_sockfd, EV_READ|EV_PERSIST, + func_client_read_bfdres_cb, &func_group->func_nodes[i]); + event_add(&func_group->func_nodes[i].msgevent, NULL); + + func_client_send_bfd_packet(&func_group->func_nodes[i], NULL, 0, (struct sockaddr *)&func_group->func_nodes[i].sinaddr); //todo + + tv.tv_sec = 0; + tv.tv_usec = g_wysf_global_info.bfd_timeout_ms * 1000; + evtimer_assign(&func_group->func_nodes[i].alive_detect_timer, evbase, func_client_alive_timer_cb, &func_group->func_nodes[i]); + evtimer_add(&func_group->func_nodes[i].alive_detect_timer, &tv); + } + } + + evtimer_assign(&g_wysf_global_info.fs_timer_output, evbase, finger_fs_output_timer_cb, NULL); + tv.tv_sec = g_wysf_global_info.fsstat_period; + tv.tv_usec = 0; + evtimer_add(&g_wysf_global_info.fs_timer_output, &tv); + + MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_FATAL, "keepalive thread started."); + event_base_dispatch(evbase); + printf("Libevent dispath error, should not run here.\n"); + MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_FATAL, "Libevent dispath error, should not run here."); + assert(0);return NULL; +} + diff --git a/src/wy_singleflow_keepalive.h b/src/wy_singleflow_keepalive.h new file mode 100644 index 0000000..00937a0 --- /dev/null +++ b/src/wy_singleflow_keepalive.h @@ -0,0 +1,53 @@ +#ifndef __WY_SINGLE_FLOW_KEEPALIVE_H__ +#define __WY_SINGLE_FLOW_KEEPALIVE_H__ + +#include <event.h> + +#include "nirvana_conhash.h" + +struct time_event +{ + struct event timer_event; + struct consistent_hash *conhash; +}; + +struct function_node +{ + char *dst_ip_str; + int32_t udp_sockfd; + int32_t ip_as_bucketid; + int32_t conhash_inserted; + int32_t retry_times; + struct sockaddr_in sinaddr; + + struct event msgevent; + struct event alive_detect_timer; + struct event send_bfd_timer; + + struct judian_as_group *parent; +}; + +struct forward_nodes +{ + char *dst_ip_str; + int32_t udp_sockfd; + int32_t dst_ip; + struct sockaddr_in sinaddr; + struct judian_as_group *parent; +}; + +struct judian_as_group +{ + char groupname[64]; + struct function_node *func_nodes; + struct forward_nodes *fwd_nodes; + int32_t func_nodes_num; + int32_t fwd_nodes_num; + struct consistent_hash *conhash; +}; + + +void *thread_funcnode_keepalive(void *arg); + +#endif + |
