summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
author[email protected] <[email protected]>2021-11-02 12:34:05 +0800
committer[email protected] <[email protected]>2021-11-02 12:34:05 +0800
commit31f55f0b88d4af34a8a36497f5e49c69b88b2fbf (patch)
tree63515b3ceb361369cdc88ae6db1a808fc80e5b42 /src
Diffstat (limited to 'src')
-rw-r--r--src/Makefile22
-rw-r--r--src/nirvana_conhash.cpp483
-rw-r--r--src/nirvana_conhash.h50
-rw-r--r--src/nirvana_murmurhash.cpp99
-rw-r--r--src/nirvana_murmurhash.h16
-rw-r--r--src/wy_singleflow_broadcast.cpp126
-rw-r--r--src/wy_singleflow_broadcast.h30
-rw-r--r--src/wy_singleflow_entry.cpp324
-rw-r--r--src/wy_singleflow_entry.h88
-rw-r--r--src/wy_singleflow_keepalive.cpp237
-rw-r--r--src/wy_singleflow_keepalive.h53
11 files changed, 1528 insertions, 0 deletions
diff --git a/src/Makefile b/src/Makefile
new file mode 100644
index 0000000..801e452
--- /dev/null
+++ b/src/Makefile
@@ -0,0 +1,22 @@
+CC=g++
+CCC=g++
+
+CFLAGS = -Wall -g -fPIC
+
+INC_PATH = -I../include
+LIBS = -lMESA_prof_load -lMESA_handle_logger -lMESA_field_stat2 -lMESA_htable
+
+OBJS = nirvana_conhash.o nirvana_murmurhash.o wy_singleflow_entry.o wy_singleflow_keepalive.o
+
+TARGET=wy_singleflow.so
+
+$(TARGET):$(OBJS)
+ $(CCC) -fPIC -shared $^ -o $@ $(LIBS)
+# cp $@ /home/zhangcw/sapp_jcq_dumpfile/plug/business/stream_packets_cache/
+
+.cpp.o:
+ $(CC) -c $(CFLAGS) $(INC_PATH) $<
+
+clean:
+ rm -f $(TARGET) $(OBJS)
+
diff --git a/src/nirvana_conhash.cpp b/src/nirvana_conhash.cpp
new file mode 100644
index 0000000..d687242
--- /dev/null
+++ b/src/nirvana_conhash.cpp
@@ -0,0 +1,483 @@
+#include <stdint.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <assert.h>
+#include <math.h>
+
+#include <map>
+
+#include "nirvana_murmurhash.h"
+#include "nirvana_conhash.h"
+
+using namespace std;
+
+#ifndef offsetof
+#define offsetof(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER)
+#endif
+
+struct ch_point
+{
+ u_int32_t bucket_id;
+ u_int32_t bucket_index; /* which backend it belongs to, use IP address */
+ u_int64_t hit_cnt;
+ u_int64_t point_val; /* hash code of this nodes, it is used to map node to consistent hash cycle */
+};
+
+struct ch_bucket_inner
+{
+ struct conhash_bucket bucket;
+
+ struct ch_point point_array[CONHASH_MAX_POINTS_PER_BUCKET]; //��ʹ��point_val��Աȥ��
+ int32_t is_valid;
+ int32_t bucket_index;
+ u_int64_t hit_cnt;
+};
+
+struct consistent_hash
+{
+ struct ch_bucket_inner *bucket_array;
+ u_int32_t bucket_array_size;
+ u_int32_t bucket_cnt;
+ u_int32_t point_num;
+ struct ch_point *point_array;
+ map<u_int32_t, u_int32_t> *map_id_index;
+};
+
+//coefficient of variation of the RMSD
+double conhash_calulate_CVRSMD(struct consistent_hash *ch)
+{
+ struct ch_bucket_inner* b=NULL;
+ u_int32_t i=0;
+ double sum_hit=0, sum_point=0;
+ double MSE=0,RMSD=0,CVRMSD=0;
+ for(i=0;i<ch->bucket_array_size;i++)
+ {
+ b=ch->bucket_array+i;
+ if(b->is_valid==0)
+ {
+ continue;
+ }
+ sum_hit+=(double)b->hit_cnt;
+ sum_point+=(double)b->bucket.point_num;
+ }
+ for(i=0;i<ch->bucket_array_size;i++)
+ {
+ b=ch->bucket_array+i;
+ if(b->is_valid==0)
+ {
+ continue;
+ }
+ MSE+=pow(b->hit_cnt-(b->bucket.point_num*sum_hit)/sum_point,2);
+ }
+ RMSD = sqrt(MSE/ch->bucket_cnt);
+ CVRMSD = RMSD/(sum_hit/ch->bucket_cnt);
+ return CVRMSD;
+}
+
+static int qsort_cmp_by_key_increase(const void* a, const void* b)
+{
+ if(((const struct ch_point*)a)->point_val > ((const struct ch_point*)b)->point_val)
+ {
+ return 1;
+ }
+ else if(((const struct ch_point*)a)->point_val == ((const struct ch_point*)b)->point_val)
+ {
+ return 0;
+ }
+ else
+ {
+ return -1;
+ }
+}
+
+static int qsort_cmp_by_key_decrease(const void* a, const void* b)
+{
+ if(((const struct ch_point*)a)->point_val > ((const struct ch_point*)b)->point_val)
+ {
+ return -1;
+ }
+ else if(((const struct ch_point*)a)->point_val == ((const struct ch_point*)b)->point_val)
+ {
+ return 0;
+ }
+ else
+ {
+ return 1;
+ }
+}
+
+// (vector<int>& nums, int target)
+static u_int32_t search_up_bound(u_int64_t target, const void *base,
+ int32_t nmemb, size_t size,int val_offset)
+{
+ int32_t low = 0, high = nmemb-1, mid;
+
+ // Invariant: the desired index is between [low, high+1]
+
+ while (low <= high)
+ {
+ mid = low + (high-low)/2;
+ if(*(u_int64_t*)((char*)base+size*mid+val_offset) < target)
+ {
+ low = mid+1;
+ }
+ else
+ {
+ high = mid-1;
+ }
+ }
+ if(low == nmemb)
+ {
+ low=0;
+ }
+ // (1) At this point, low > high. That is, low >= high+1
+ // (2) From the invariant, we know that the index is between [low, high+1], so low <= high+1. Follwing from (1), now we know low == high+1.
+ // (3) Following from (2), the index is between [low, high+1] = [low, low], which means that low is the desired index
+ // Therefore, we return low as the answer. You can also return high+1 as the result, since low == high+1
+ return low;
+}
+
+//��֤��ͬ��bucket_id&&point_index������ͬ��point_id
+static u_int64_t bucket_gen_uniq_point(struct ch_bucket_inner *inner_bucket, u_int32_t cur_point_index)
+{
+ u_int64_t x=0, seed;
+ u_int32_t hash, i=0;
+
+ seed = (((u_int64_t)cur_point_index)<<32) | inner_bucket->bucket.bucket_id;
+ hash = murmurhash2(&seed, sizeof(u_int64_t), 515880193);
+ x = (((u_int64_t)hash)<<32) | inner_bucket->bucket.bucket_id;
+
+ while(i != cur_point_index)
+ {
+ for(i=0; i<cur_point_index; i++)
+ {
+ if(x == inner_bucket->point_array[i].point_val) //��ͻ
+ {
+ seed = (((u_int64_t)hash)<<32) | inner_bucket->bucket.bucket_id;
+ hash = murmurhash2(&seed, sizeof(u_int64_t), 515880193);
+ x = (((u_int64_t)hash)<<32) | inner_bucket->bucket.bucket_id;
+ i = 0;
+ break;
+ }
+ }
+ }
+ inner_bucket->point_array[cur_point_index].point_val = x;
+ return x;
+}
+
+u_int32_t conhash_get_bucket_num(struct consistent_hash *ch)
+{
+ return ch->bucket_cnt;
+}
+
+struct consistent_hash *conhash_instance_new(const struct conhash_bucket *buckets, uint32_t bucket_num)
+{
+ struct consistent_hash *ch=NULL;
+ u_int32_t i, j, k;
+ u_int64_t randval;
+
+ for(i=0; i<bucket_num; i++)
+ {
+ if(buckets[i].point_num > CONHASH_MAX_POINTS_PER_BUCKET)
+ {
+ return NULL;
+ }
+ }
+ ch = (struct consistent_hash *)calloc(1, sizeof(struct consistent_hash));
+
+ /*buckets*/
+ ch->map_id_index = new map<u_int32_t, u_int32_t>;
+ ch->bucket_array = (struct ch_bucket_inner*)calloc(1, sizeof(struct ch_bucket_inner)*bucket_num);
+ for(i=0; i<bucket_num; i++)
+ {
+ memcpy(&(ch->bucket_array[i].bucket), &buckets[i], sizeof(struct conhash_bucket));
+ ch->bucket_array[i].is_valid = 1;
+ ch->bucket_array[i].bucket_index = i;
+ ch->point_num += buckets[i].point_num;
+ ch->map_id_index->insert(make_pair(buckets[i].bucket_id, i));
+ }
+ ch->bucket_cnt = bucket_num;
+
+ /*global points*/
+ ch->point_array = (struct ch_point*)calloc(1, sizeof(struct ch_point)*ch->point_num);
+ ch->bucket_array_size = bucket_num;
+ for(i=0, k=0; i<ch->bucket_array_size; i++)
+ {
+ for(j=0; j<ch->bucket_array[i].bucket.point_num; j++,k++)
+ {
+ randval = bucket_gen_uniq_point(&ch->bucket_array[i], j);
+ ch->point_array[k].bucket_id = ch->bucket_array[i].bucket.bucket_id;
+ ch->point_array[k].bucket_index = i;
+ ch->point_array[k].point_val = randval;
+ ch->point_array[k].hit_cnt = 0;
+ }
+ qsort(ch->bucket_array[i].point_array, ch->bucket_array[i].bucket.point_num, sizeof(struct ch_point), qsort_cmp_by_key_decrease);
+ }
+ qsort(ch->point_array, ch->point_num, sizeof(struct ch_point), qsort_cmp_by_key_increase);
+ return ch;
+}
+
+void conhash_instance_free(struct consistent_hash *ch)
+{
+ free(ch->point_array);
+ free(ch->bucket_array);
+ delete ch->map_id_index;
+ free(ch);
+}
+
+struct consistent_hash *conhash_instance_copy(struct consistent_hash *ch)
+{
+ struct consistent_hash *copy;
+
+ copy = (struct consistent_hash *)calloc(1, sizeof(struct consistent_hash));
+ copy->bucket_array_size = ch->bucket_array_size;
+ copy->bucket_cnt = ch->bucket_cnt;
+ copy->point_num = ch->point_num;
+ copy->bucket_array = (struct ch_bucket_inner*)calloc(sizeof(struct ch_bucket_inner), ch->bucket_array_size);
+ memcpy(copy->bucket_array, ch->bucket_array, sizeof(struct ch_bucket_inner)*ch->bucket_array_size);
+
+ copy->point_array = (struct ch_point*)calloc(sizeof(struct ch_point), copy->point_num);
+ memcpy(copy->point_array, ch->point_array, sizeof(struct ch_point)*copy->point_num);
+
+ copy->map_id_index = new map<u_int32_t, u_int32_t>;
+ copy->map_id_index->insert(ch->map_id_index->begin(), ch->map_id_index->end());
+ return copy;
+}
+
+static enum CONHASH_ERRCODE conhash_add_points(struct consistent_hash *ch, struct ch_bucket_inner *inner_bucket, int32_t add_points)
+{
+ u_int64_t randval;
+
+ if(inner_bucket->bucket.point_num + add_points > CONHASH_MAX_POINTS_PER_BUCKET)
+ {
+ assert(0);return CONHASH_ERR_INVALID_ARGS;
+ }
+
+ ch->point_array = (struct ch_point *)realloc(ch->point_array,sizeof(struct ch_point)*(ch->point_num+add_points));
+
+ for(u_int32_t i=ch->point_num; i<ch->point_num+add_points; i++)
+ {
+ randval = bucket_gen_uniq_point(inner_bucket, inner_bucket->bucket.point_num);
+ inner_bucket->bucket.point_num++;
+ ch->point_array[i].bucket_id = inner_bucket->bucket.bucket_id;
+ ch->point_array[i].bucket_index = inner_bucket->bucket_index;
+ ch->point_array[i].point_val = randval;
+ ch->point_array[i].hit_cnt = 0;
+ }
+ ch->point_num += add_points;
+ qsort(inner_bucket->point_array, inner_bucket->bucket.point_num, sizeof(struct ch_point), qsort_cmp_by_key_decrease);
+ qsort(ch->point_array, ch->point_num, sizeof(struct ch_point), qsort_cmp_by_key_increase);
+ return CONHASH_OK;
+}
+
+static enum CONHASH_ERRCODE conhash_del_points(struct consistent_hash *ch, struct ch_bucket_inner *inner_bucket, u_int32_t del_points)
+{
+ struct ch_point *tmp_points;
+ u_int32_t i, j, removed;
+
+ if(inner_bucket->bucket.point_num == 0)
+ {
+ return CONHASH_OK;
+ }
+ if(inner_bucket->bucket.point_num < del_points)
+ {
+ assert(0);return CONHASH_ERR_INVALID_ARGS;
+ }
+
+ tmp_points = (struct ch_point*)malloc(sizeof(struct ch_point)*ch->point_num);
+ memcpy(tmp_points, ch->point_array, sizeof(struct ch_point)*ch->point_num);
+
+ for(i=0,j=0,removed=0; i<ch->point_num; i++)
+ {
+ if(removed<del_points && tmp_points[i].bucket_id==inner_bucket->bucket.bucket_id)
+ {
+ assert(inner_bucket->point_array[inner_bucket->bucket.point_num-1].point_val == tmp_points[i].point_val);
+ inner_bucket->bucket.point_num--;
+ removed++;
+ continue;
+ }
+ memcpy(&ch->point_array[j], &tmp_points[i], sizeof(struct ch_point));
+ j++;
+ }
+ assert(removed == del_points);
+ free(tmp_points);
+ ch->point_num -= del_points;
+ //Sort is unnecessary after deletion.
+ return CONHASH_OK;
+}
+
+enum CONHASH_ERRCODE conhash_insert_bucket(struct consistent_hash *ch, const struct conhash_bucket *bucket)
+{
+ struct ch_bucket_inner *inner_bucket=NULL;
+ u_int32_t i, bucket_index;
+ map<u_int32_t, u_int32_t>::iterator iter;
+ enum CONHASH_ERRCODE code;
+
+ if(bucket->point_num <= 0)
+ {
+ return CONHASH_ERR_INVALID_ARGS;
+ }
+ if((iter=ch->map_id_index->find(bucket->bucket_id)) != ch->map_id_index->end())
+ {
+ return CONHASH_BUCKET_ALREADY_EXIST;
+ }
+
+ if(ch->bucket_cnt < ch->bucket_array_size)
+ {
+ for(i=0; i<ch->bucket_array_size; i++)
+ {
+ if(ch->bucket_array[i].is_valid == 0)
+ {
+ bucket_index = i;
+ break;
+ }
+ }
+ assert(i < ch->bucket_array_size && ch->bucket_array[bucket_index].bucket.point_num==0); //һ�����ҵ�
+
+ inner_bucket = &ch->bucket_array[bucket_index];
+ }
+ else
+ {
+ assert(ch->bucket_array_size == ch->bucket_cnt);
+ bucket_index = ch->bucket_cnt;
+ ch->bucket_array_size = ch->bucket_cnt + 1;
+ ch->bucket_array = (struct ch_bucket_inner*)realloc(ch->bucket_array, sizeof(struct ch_bucket_inner)*ch->bucket_array_size);
+ memset(&ch->bucket_array[bucket_index], 0, sizeof(struct ch_bucket_inner));
+ inner_bucket = &ch->bucket_array[bucket_index];
+ }
+ inner_bucket->bucket.bucket_id = bucket->bucket_id;
+ inner_bucket->bucket.tag = bucket->tag;
+ inner_bucket->bucket_index = bucket_index;
+
+ if(CONHASH_OK != (code=conhash_add_points(ch, inner_bucket, bucket->point_num)))
+ {
+ return code;
+ }
+ inner_bucket->is_valid = 1;
+ inner_bucket->bucket_index = bucket_index;
+ inner_bucket->hit_cnt = 0;
+ ch->bucket_cnt++;
+ ch->map_id_index->insert(make_pair(bucket->bucket_id, bucket_index));
+ return CONHASH_OK;
+}
+
+enum CONHASH_ERRCODE conhash_remove_bucket(struct consistent_hash *ch, u_int32_t bucket_id, void (*free_cb)(void *tag, u_int32_t point_num))
+{
+ struct ch_bucket_inner* inner_bucket=NULL;
+ u_int32_t bucket_index;
+ map<u_int32_t, u_int32_t>::iterator iter;
+ enum CONHASH_ERRCODE code;
+
+ if((iter=ch->map_id_index->find(bucket_id)) == ch->map_id_index->end())
+ {
+ return CONHASH_BUCKET_NOT_FOUND;
+ }
+ bucket_index = iter->second;
+ assert(bucket_index < ch->bucket_array_size);
+
+ inner_bucket = &ch->bucket_array[bucket_index];
+ if(CONHASH_OK != (code=conhash_del_points(ch, inner_bucket, inner_bucket->bucket.point_num)))
+ {
+ return code;
+ }
+ ch->bucket_cnt--;
+ inner_bucket->is_valid = 0;
+ if(free_cb)
+ {
+ free_cb(inner_bucket->bucket.tag, inner_bucket->bucket.point_num);
+ }
+ inner_bucket->bucket.point_num = 0;
+ inner_bucket->bucket.tag = NULL;
+ ch->map_id_index->erase(bucket_id);
+ return CONHASH_OK;
+}
+
+enum CONHASH_ERRCODE conhash_renew_bucket(struct consistent_hash *ch, struct conhash_bucket *bucket)
+{
+ struct ch_bucket_inner* inner_bucket=NULL;
+ u_int32_t bucket_index;
+ map<u_int32_t, u_int32_t>::iterator iter;
+
+ if((iter=ch->map_id_index->find(bucket->bucket_id)) == ch->map_id_index->end())
+ {
+ assert(0);return CONHASH_BUCKET_NOT_FOUND;
+ }
+ bucket_index = iter->second;
+ assert(bucket_index < ch->bucket_array_size);
+
+ inner_bucket = &ch->bucket_array[bucket_index];
+ assert(inner_bucket->is_valid == 1);
+ inner_bucket->bucket.tag = bucket->tag;
+
+ if(inner_bucket->bucket.point_num == bucket->point_num)
+ {
+ return CONHASH_OK;
+ }
+ else if(inner_bucket->bucket.point_num < bucket->point_num)
+ {
+ return conhash_add_points(ch, inner_bucket, bucket->point_num-inner_bucket->bucket.point_num);
+ }
+ else
+ {
+ return conhash_del_points(ch, inner_bucket, inner_bucket->bucket.point_num-bucket->point_num);
+ }
+}
+
+enum CONHASH_ERRCODE conhash_lookup_bucket(struct consistent_hash *ch, const void* key, int len, struct conhash_bucket* result)
+{
+ int idx=0, bucket_index=0;
+ u_int64_t hash;
+
+ if(ch->bucket_cnt == 0)
+ {
+ return CONHASH_NO_VALID_BUCKETS;
+ }
+
+ hash = MurmurHash64A(key, len, 515880193);
+ idx = search_up_bound(hash, ch->point_array, ch->point_num, sizeof(struct ch_point), offsetof(struct ch_point, point_val));
+ ch->point_array[idx].hit_cnt++;
+ bucket_index = ch->point_array[idx].bucket_index;
+ assert(ch->bucket_array[bucket_index].is_valid == 1);
+ ch->bucket_array[bucket_index].hit_cnt++;
+ memcpy(result, &(ch->bucket_array[bucket_index].bucket), sizeof(struct conhash_bucket));
+ return CONHASH_OK;
+}
+
+enum CONHASH_ERRCODE conhash_lookup_bucket_int(struct consistent_hash *ch, u_int64_t randint, struct conhash_bucket* result)
+{
+ int idx=0, bucket_index=0;
+
+ if(ch->bucket_cnt == 0)
+ {
+ return CONHASH_NO_VALID_BUCKETS;
+ }
+
+ idx = search_up_bound(randint, ch->point_array, ch->point_num, sizeof(struct ch_point), offsetof(struct ch_point, point_val));
+ ch->point_array[idx].hit_cnt++;
+ bucket_index = ch->point_array[idx].bucket_index;
+ assert(ch->bucket_array[bucket_index].is_valid == 1);
+ ch->bucket_array[bucket_index].hit_cnt++;
+ memcpy(result, &(ch->bucket_array[bucket_index].bucket), sizeof(struct conhash_bucket));
+ return CONHASH_OK;
+}
+
+void conhash_dump_detail(struct consistent_hash *ch)
+{
+ /*for(u_int32_t i=0; i<ch->point_num; i++)
+ {
+ printf("bucket_id: %10u, bucket_index: %5u, point_val:%lx, hit_cnt: %lu\n", ch->point_array[i].bucket_id,
+ ch->point_array[i].bucket_index, ch->point_array[i].point_val, ch->point_array[i].hit_cnt);
+ }
+
+ printf("\n\n\n\n");*/
+ for(u_int32_t i=0; i<ch->bucket_cnt; i++)
+ {
+ if(ch->bucket_array[i].is_valid)
+ {
+ printf("bucket_id: %10u, bucket_index: %5u, hit_cnt: %lu\n", ch->bucket_array[i].bucket.bucket_id,
+ i, ch->bucket_array[i].hit_cnt);
+ }
+ }
+}
+
diff --git a/src/nirvana_conhash.h b/src/nirvana_conhash.h
new file mode 100644
index 0000000..42320b1
--- /dev/null
+++ b/src/nirvana_conhash.h
@@ -0,0 +1,50 @@
+#ifndef __NVN_CONSISTENT_HASH_H__
+#define __NVN_CONSISTENT_HASH_H__
+
+#include <stdint.h>
+
+#ifndef CONHASH_MAX_POINTS_PER_BUCKET
+#define CONHASH_MAX_POINTS_PER_BUCKET 128
+#endif
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+enum CONHASH_ERRCODE
+{
+ CONHASH_OK = 0,
+ CONHASH_ERR_INVALID_ARGS = -1,
+ CONHASH_BUCKET_NOT_FOUND = -2,
+ CONHASH_BUCKET_ALREADY_EXIST=-3,
+ CONHASH_NO_VALID_BUCKETS=-4,
+};
+
+struct conhash_bucket
+{
+ uint32_t bucket_id;
+ uint32_t point_num; /*should be not more than CONHASH_MAX_POINTS_PER_BUCKET*/
+ void* tag;
+};
+struct consistent_hash;
+
+/*API�̲߳���ȫ*/
+struct consistent_hash *conhash_instance_new(const struct conhash_bucket *buckets, uint32_t bucket_num);
+void conhash_instance_free(struct consistent_hash *ch);
+struct consistent_hash *conhash_instance_copy(struct consistent_hash *ch);
+
+enum CONHASH_ERRCODE conhash_insert_bucket(struct consistent_hash *ch,const struct conhash_bucket* bucket);
+enum CONHASH_ERRCODE conhash_remove_bucket(struct consistent_hash *ch, u_int32_t bucket_id, void (*free_cb)(void *tag, u_int32_t point_num));
+enum CONHASH_ERRCODE conhash_renew_bucket(struct consistent_hash *ch, struct conhash_bucket* bucket); /*����point_num��tag*/
+enum CONHASH_ERRCODE conhash_lookup_bucket(struct consistent_hash *ch, const void* key, int len, struct conhash_bucket *result/*OUT*/);
+enum CONHASH_ERRCODE conhash_lookup_bucket_int(struct consistent_hash *ch, u_int64_t randint, struct conhash_bucket* result);
+
+double conhash_calulate_CVRSMD(struct consistent_hash *p);
+u_int32_t conhash_get_bucket_num(struct consistent_hash *ch);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/src/nirvana_murmurhash.cpp b/src/nirvana_murmurhash.cpp
new file mode 100644
index 0000000..193bde9
--- /dev/null
+++ b/src/nirvana_murmurhash.cpp
@@ -0,0 +1,99 @@
+#include <stdint.h>
+#include <stdlib.h>
+#include <stdio.h>
+
+#include "nirvana_murmurhash.h"
+
+unsigned int murmurhash2(const void * key, int len, const unsigned int seed)
+{
+ // 'm' and 'r' are mixing constants generated offline.
+ // They're not really 'magic', they just happen to work well.
+
+ const unsigned int m = 0x5bd1e995;
+ const int r = 24;
+
+ // Initialize the hash to a 'random' value
+
+ unsigned int h = seed ^ len;
+
+ // Mix 4 bytes at a time into the hash
+
+ const unsigned char * data = (const unsigned char *)key;
+
+ while(len >= 4)
+ {
+ unsigned int k = *(unsigned int *)data;
+
+ k *= m;
+ k ^= k >> r;
+ k *= m;
+
+ h *= m;
+ h ^= k;
+
+ data += 4;
+ len -= 4;
+ }
+
+ // Handle the last few bytes of the input array
+
+ switch(len)
+ {
+ case 3: h ^= data[2] << 16;
+ case 2: h ^= data[1] << 8;
+ case 1: h ^= data[0];
+ h *= m;
+ default:break;
+ };
+
+ // Do a few final mixes of the hash to ensure the last few
+ // bytes are well-incorporated.
+
+ h ^= h >> 13;
+ h *= m;
+ h ^= h >> 15;
+ return h;
+}
+
+/*64-bit hash for 64-bit platforms*/
+u_int64_t MurmurHash64A(const void * key, int len, unsigned int seed)
+{
+ const uint64_t m = 0xc6a4a7935bd1e995;
+ const int r = 47;
+ uint64_t k, h = seed ^ (len * m);
+ const uint64_t * data = (const uint64_t *)key;
+ const uint64_t * end = data + (len/8);
+
+ while (data != end)
+ {
+ k = *data++;
+ k *= m;
+ k ^= k >> r;
+ k *= m;
+
+ h ^= k;
+ h *= m;
+ }
+
+ const unsigned char * data2 = (const unsigned char*)data;
+
+ switch (len & 7)
+ {
+ case 7: h ^= uint64_t(data2[6]) << 48;
+ case 6: h ^= uint64_t(data2[5]) << 40;
+ case 5: h ^= uint64_t(data2[4]) << 32;
+ case 4: h ^= uint64_t(data2[3]) << 24;
+ case 3: h ^= uint64_t(data2[2]) << 16;
+ case 2: h ^= uint64_t(data2[1]) << 8;
+ case 1: h ^= uint64_t(data2[0]);
+ h *= m;
+ };
+
+ h ^= h >> r;
+ h *= m;
+ h ^= h >> r;
+
+ return h;
+}
+
+
diff --git a/src/nirvana_murmurhash.h b/src/nirvana_murmurhash.h
new file mode 100644
index 0000000..1467943
--- /dev/null
+++ b/src/nirvana_murmurhash.h
@@ -0,0 +1,16 @@
+#ifndef __NVN_MURMURHASH_H__
+#define __NVN_MURMURHASH_H__
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+unsigned int murmurhash2(const void * key, int len, const unsigned int seed);
+u_int64_t MurmurHash64A(const void * key, int len, unsigned int seed);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
+
diff --git a/src/wy_singleflow_broadcast.cpp b/src/wy_singleflow_broadcast.cpp
new file mode 100644
index 0000000..3cc86c4
--- /dev/null
+++ b/src/wy_singleflow_broadcast.cpp
@@ -0,0 +1,126 @@
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <errno.h>
+#include <pthread.h>
+#include <string.h>
+#include <sys/ioctl.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <arpa/inet.h>
+#include <sys/prctl.h>
+
+#include "wy_singleflow_entry.h"
+
+
+extern struct wysf_global_info g_wysf_global_info;
+
+static int udp_broadcast_server_socket_init(void *log_runtime, u_int16_t port)
+{
+ struct sockaddr_in servaddr;
+ int on = 1, socket_fd;
+
+ if((socket_fd = socket(AF_INET,SOCK_DGRAM,0))<0)//AF_INET:IPV4 AF_INET6:IPV6
+ {
+ return -1;
+ }
+ if(setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))<0)
+ {
+ MESA_RUNTIME_LOGV3(log_runtime, RLOG_LV_FATAL, "setsockopt for udp_port %u failed, because:%s!!", port, strerror(errno));
+ }
+ on = 5242880;
+ if(setsockopt(socket_fd, SOL_SOCKET, SO_RCVBUF, &on, sizeof(on))<0)
+ {
+ MESA_RUNTIME_LOGV3(log_runtime, RLOG_LV_FATAL, "setsockopt for udp_port %u failed, because:%s!!", port, strerror(errno));
+ }
+
+ bzero(&servaddr,sizeof(servaddr));
+ servaddr.sin_family = AF_INET;
+ servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
+ servaddr.sin_port = htons(port);
+
+ if(bind(socket_fd,(struct sockaddr *)&servaddr,sizeof(servaddr))<0)
+ {
+ MESA_RUNTIME_LOGV3(log_runtime, RLOG_LV_FATAL, "bind udp socket port %u failed, because: %s!!", port, strerror(errno));
+ close(socket_fd);
+ return -2;
+ }
+ evutil_make_socket_nonblocking(socket_fd);
+ return socket_fd;
+}
+
+static long broadcast_stream_tuple_htable_cb(void *data, const uchar *key, uint size, void *user_arg)
+{
+ int32_t clj_ip=*(int32_t *)user_arg;
+ std::map<int32_t, struct judian_as_group*>::iterator iter;
+ struct judian_as_group *group;
+
+ if((iter = g_wysf_global_info.forwardip2judian->find(clj_ip)) == g_wysf_global_info.forwardip2judian->end())
+ {
+ char ipbuffer[32];
+ inet_ntop(AF_INET, &clj_ip, ipbuffer, 32);
+ MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_FATAL, "lookup group for clj_ip: %s not found!!", ipbuffer);
+ return -1;
+ }
+ group = iter->second;
+
+ if(data != NULL)
+ {
+ MESA_htable_del(g_wysf_global_info.stream_tuple_mapping, key, size, NULL);
+ }
+ MESA_htable_add(g_wysf_global_info.stream_tuple_mapping, key, size, group);
+ return 0;
+}
+
+void udp_server_read_broadcast_cb(evutil_socket_t fd, short events, void *arg)
+{
+ struct udp_broadcast_msg *broadmsg;
+ char buffer[4096];
+ int from_len = 0, buf_len, clj_ip;
+ struct sockaddr_in saddr;
+ long cb_ret;
+
+ from_len = sizeof (struct sockaddr);
+ while((buf_len = recvfrom (fd, buffer, 4096, 0, (struct sockaddr*)&saddr, (socklen_t *)&from_len))>0)
+ {
+ broadmsg = (struct udp_broadcast_msg *)buffer;
+ if((u_int32_t)buf_len!=sizeof(struct udp_broadcast_msg) || broadmsg->header.magic_number != WYSF_MAGIC_NUMBER ||
+ broadmsg->header.total_len!=sizeof(struct broadcast_message))
+ {
+ continue;
+ }
+
+ clj_ip = broadmsg->msg.clj_ip;
+ broadmsg->msg.clj_ip = 0;
+ MESA_htable_search_cb(g_wysf_global_info.stream_tuple_mapping, (uchar*)&broadmsg->msg, sizeof(struct broadcast_message),
+ broadcast_stream_tuple_htable_cb, &clj_ip, &cb_ret);
+
+ from_len = sizeof (struct sockaddr);
+ }
+}
+
+void *thread_fwdnode_breoadcast(void *arg)
+{
+ struct event_base *evbase;
+ struct event udp_server_event;
+
+ evbase = event_base_new();
+
+ if((g_wysf_global_info.broadcast_udp_servert_sockfd = udp_broadcast_server_socket_init(g_wysf_global_info.log_runtime, g_wysf_global_info.broadcast_udp_server_port)) < 0)
+ {
+ assert(0);return NULL;
+ }
+ event_assign(&udp_server_event, evbase, g_wysf_global_info.broadcast_udp_servert_sockfd,
+ EV_READ|EV_PERSIST, udp_server_read_broadcast_cb, NULL);
+ event_add(&udp_server_event, NULL);
+
+ event_base_dispatch(evbase);
+ printf("Libevent dispath error, should not run here.\n");
+ MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_FATAL, "Libevent dispath error, should not run here.");
+ assert(0);return NULL;
+}
+
diff --git a/src/wy_singleflow_broadcast.h b/src/wy_singleflow_broadcast.h
new file mode 100644
index 0000000..cbdb8a4
--- /dev/null
+++ b/src/wy_singleflow_broadcast.h
@@ -0,0 +1,30 @@
+#ifndef __WYSF_BROADCAST_H__
+#define __WYSF_BROADCAST_H__
+
+struct udp_message_header
+{
+ u_int16_t magic_number;
+ u_int8_t version;
+ u_int8_t struct_type;
+ u_int32_t total_len;
+};
+
+struct broadcast_message
+{
+ int32_t src_ip;
+ int32_t dst_ip;
+ int16_t src_port;
+ int16_t dst_port;
+ int32_t clj_ip;
+};
+
+struct udp_broadcast_msg
+{
+ struct udp_message_header header;
+ struct broadcast_message msg;
+};
+
+void *thread_fwdnode_breoadcast(void *arg);
+
+#endif
+
diff --git a/src/wy_singleflow_entry.cpp b/src/wy_singleflow_entry.cpp
new file mode 100644
index 0000000..a464d8f
--- /dev/null
+++ b/src/wy_singleflow_entry.cpp
@@ -0,0 +1,324 @@
+#include <assert.h>
+#include <string.h>
+#include <unistd.h>
+#include <ctype.h>
+#include <stdlib.h>
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <netinet/ip6.h>
+#include <sys/time.h>
+#include <errno.h>
+
+#include <MESA/MESA_prof_load.h>
+#include <MESA/stream.h>
+
+#include "wy_singleflow_entry.h"
+
+struct wysf_global_info g_wysf_global_info;
+
+
+char WYSF_TCP_STREAM_PKT_ENTRY(struct streaminfo *a_stream, void **pme, int thread_seq, void *a_packet)
+{
+ struct wysf_stream_context *context=(struct wysf_stream_context *)*pme;
+
+ if(a_stream->pktstate == OP_STATE_PENDING)
+ {
+ context = *pme = (struct wysf_stream_context *)dictator_malloc(thread_seq, sizeof(struct wysf_stream_context));
+ context->func_node = NULL;
+ }
+
+
+ return APP_STATE_GIVEME;
+}
+
+
+static int wysf_register_field_stat(struct wysf_global_info *info)
+{
+ const char *field_names[WYSF_FSSTAT_FIELD_MAX]={"FlowsTotal", "FlowsIntcpt", "FlowsRtnBack", "FlowsIntBytes",
+ "BroadCstSent", "BroadCstRecv"};
+ const char *status_names[WYSF_FSSTAT_STATUS_MAX]={"FlowTable", "ActiveFuncs", "KeepAliveSW"};
+ int value;
+
+ info->fsstat_handle = FS_create_handle();
+ FS_set_para(info->fsstat_handle, OUTPUT_DEVICE, info->fsstat_filepath, strlen(info->fsstat_filepath)+1);
+ if(info->fsstat_print_mode == 1)
+ {
+ FS_set_para(info->fsstat_handle, PRINT_MODE, &info->fsstat_print_mode, sizeof(info->fsstat_print_mode));
+ }
+ else
+ {
+ FS_set_para(info->fsstat_handle, PRINT_MODE, &info->fsstat_print_mode, sizeof(info->fsstat_print_mode));
+ value = 1;
+ FS_set_para(info->fsstat_handle, FLUSH_BY_DATE, &value, sizeof(value));
+ }
+ value = info->fsstat_period;
+ FS_set_para(info->fsstat_handle, STAT_CYCLE, &value, sizeof(value));
+ value = 0;
+ FS_set_para(info->fsstat_handle, CREATE_THREAD, &value, sizeof(value));
+ FS_set_para(info->fsstat_handle, APP_NAME, info->fsstat_appname, strlen(info->fsstat_appname)+1);
+ FS_set_para(info->fsstat_handle, STATS_SERVER_IP, info->fsstat_dst_ip, strlen(info->fsstat_dst_ip)+1);
+ FS_set_para(info->fsstat_handle, STATS_SERVER_PORT, &info->fsstat_dst_port, sizeof(info->fsstat_dst_port));
+
+ for(int i=0; i<WYSF_FSSTAT_FIELD_MAX; i++)
+ {
+ info->fsstat_field_ids[i] = FS_register(info->fsstat_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, field_names[i]);
+ }
+ for(int i=0; i<WYSF_FSSTAT_STATUS_MAX; i++)
+ {
+ info->fsstat_status_ids[i] = FS_register(info->fsstat_handle, FS_STYLE_STATUS, FS_CALC_CURRENT, status_names[i]);
+ }
+ FS_start(info->fsstat_handle);
+ return 0;
+}
+
+void flow_tuple_htable_data_expires(void *data)
+{
+}
+
+MESA_htable_handle init_and_create_htable(unsigned int slot_size, int expire_time, int lock_num,
+ void (*data_free)(void *data),
+ int (*data_expire_with_condition)(void *data, int eliminate_type))
+{
+ MESA_htable_create_args_t htable_args;
+
+ memset(&htable_args, 0, sizeof(MESA_htable_create_args_t));
+
+ htable_args.thread_safe = lock_num;
+ htable_args.recursive = 1;
+ htable_args.hash_slot_size = slot_size;
+ htable_args.max_elem_num = 10*slot_size;
+ htable_args.eliminate_type = HASH_ELIMINATE_ALGO_FIFO;
+ htable_args.expire_time = expire_time;
+ htable_args.data_free = data_free; //�ͷŽڵ��ڴ�
+ htable_args.data_expire_with_condition = data_expire_with_condition;
+
+ return MESA_htable_create(&htable_args, sizeof(MESA_htable_create_args_t));
+}
+
+static int udp_broadcast_client_socket_init(void *log_runtime)
+{
+ int on = 1, socket_fd;
+
+ if((socket_fd = socket(AF_INET,SOCK_DGRAM,0))<0)
+ {
+ MESA_RUNTIME_LOGV3(log_runtime, RLOG_LV_FATAL, "create udp socket error: %s", strerror(errno));
+ return -1;
+ }
+ on = 5242880;
+ setsockopt(socket_fd, SOL_SOCKET, SO_SNDBUF, &on, sizeof(on));
+ return socket_fd;
+}
+
+static int _unfold_IP_range(char* ip_range, char***ip_list, int size)
+{
+ int i=0,count=0, ret=0;
+ int range_digits[5];
+ memset(range_digits,0,sizeof(range_digits));
+ ret=sscanf(ip_range,"%d.%d.%d.%d-%d",&range_digits[0],&range_digits[1],&range_digits[2],&range_digits[3],&range_digits[4]);
+ if(ret!=4&&ret!=5)
+ {
+ return 0;
+ }
+ if(ret==4&&range_digits[4]==0)
+ {
+ range_digits[4]=range_digits[3];
+ }
+ for(i=0;i<5;i++)
+ {
+ if(range_digits[i]<0||range_digits[i]>255)
+ {
+ return 0;
+ }
+ }
+ count=range_digits[4]-range_digits[3]+1;
+ *ip_list=(char**)realloc(*ip_list, sizeof(char*)*(size+count));
+ for(i=0;i<count;i++)
+ {
+ (*ip_list)[size+i]=(char*)malloc(64);
+ snprintf((*ip_list)[size+i],64,"%d.%d.%d.%d",range_digits[0],range_digits[1],range_digits[2],range_digits[3]+i);
+ }
+ return count;
+}
+
+static int unfold_IP_range(const char* ip_range, char***ip_list)
+{
+ char *token=NULL,*sub_token=NULL,*saveptr;
+ char *buffer=(char*)calloc(sizeof(char),strlen(ip_range)+1);
+ int count=0;
+ strcpy(buffer,ip_range);
+ for (token = buffer; ; token= NULL)
+ {
+ sub_token= strtok_r(token,";", &saveptr);
+ if (sub_token == NULL)
+ break;
+ count+=_unfold_IP_range(sub_token, ip_list,count);
+ }
+ free(buffer);
+ return count;
+}
+
+struct judian_as_group *init_function_group_nodes(const char *judian_name, char **ip_list, int ipnum, struct wysf_global_info *info)
+{
+ struct judian_as_group *group;
+
+ group = (struct judian_as_group *)calloc(1, sizeof(struct judian_as_group));
+ group->conhash = conhash_instance_new(NULL, 0);
+ snprintf(group->groupname, 64, "%s", judian_name);
+
+ group->func_nodes = (struct function_node *)calloc(1, sizeof(struct function_node)*ipnum);
+ group->func_nodes_num = ipnum;
+ for(int i=0; i<ipnum; i++)
+ {
+ inet_pton(AF_INET, ip_list[i], &group->func_nodes[i].ip_as_bucketid);
+ group->func_nodes[i].parent = group;
+ group->func_nodes[i].sinaddr.sin_family = AF_INET;
+ group->func_nodes[i].sinaddr.sin_addr.s_addr = group->func_nodes[i].ip_as_bucketid;
+ group->func_nodes[i].sinaddr.sin_port = htons(info->bfd_dest_port);
+ group->func_nodes[i].dst_ip_str = ip_list[i];
+
+ info->func_keepalive->insert(std::make_pair(group->func_nodes[i].ip_as_bucketid, &group->func_nodes[i]));
+ }
+
+ assert(info->func_group_num < 1024);
+ info->judian_group_list[info->func_group_num++] = group;
+ info->group_judian->insert(std::make_pair(std::string(judian_name), group));
+ return group;
+}
+
+int init_forward_group_nodes(struct judian_as_group *group, const char *judian_name,
+ char **ip_list, int ipnum, struct wysf_global_info *info)
+{
+ group->fwd_nodes = (struct forward_nodes *)calloc(1, sizeof(struct forward_nodes)*ipnum);
+ group->fwd_nodes_num = ipnum;
+ for(int i=0; i<ipnum; i++)
+ {
+ inet_pton(AF_INET, ip_list[i], &group->fwd_nodes[i].dst_ip);
+ group->fwd_nodes[i].parent = group;
+ group->fwd_nodes[i].sinaddr.sin_family = AF_INET;
+ group->fwd_nodes[i].sinaddr.sin_addr.s_addr = group->fwd_nodes[i].dst_ip;
+ group->fwd_nodes[i].sinaddr.sin_port = htons(info->bfd_dest_port);
+ group->fwd_nodes[i].dst_ip_str = ip_list[i];
+ if((group->fwd_nodes[i].udp_sockfd = udp_broadcast_client_socket_init(info->log_runtime)) < 0)
+ {
+ return -1;
+ }
+
+ info->forwardip2judian->insert(std::make_pair(group->fwd_nodes[i].dst_ip, group));
+ }
+ return 0;
+}
+
+int load_config_judian_function_iplist(const char *config_file, struct wysf_global_info *info)
+{
+ char namebuffer[4096], ipbuffer[1024], *judian_name, *save=NULL, **node_iplist;
+ struct judian_as_group *group;
+ int ipnum;
+
+ if(0>=MESA_load_profile_string_nodef(config_file, "MODULE", "judian_name_list", namebuffer, sizeof(namebuffer)))
+ {
+ MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [MODULE]judian_name_list not found!", config_file);
+ assert(0);return -1;
+ }
+
+ for(judian_name=strtok_r(namebuffer, ";", &save); judian_name!=NULL; judian_name=strtok_r(NULL, ";", &save))
+ {
+ if(0>=MESA_load_profile_string_nodef(config_file, judian_name, "judian_func_ip_list", ipbuffer, sizeof(ipbuffer)))
+ {
+ MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]judian_func_ip_list not found!", config_file, judian_name);
+ assert(0);return -1;
+ }
+ if((ipnum = unfold_IP_range(ipbuffer, &node_iplist)) == 0)
+ {
+ MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]judian_func_ip_list is empty!", config_file, judian_name);
+ continue;
+ }
+ group = init_function_group_nodes(judian_name, node_iplist, ipnum, info);
+ free(node_iplist);
+
+ if(0>=MESA_load_profile_string_nodef(config_file, judian_name, "judian_forward_ip_list", ipbuffer, sizeof(ipbuffer)))
+ {
+ MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]judian_forward_ip_list not found!", config_file, judian_name);
+ assert(0);return -1;
+ }
+ if((ipnum=unfold_IP_range(ipbuffer, &node_iplist)) == 0)
+ {
+ MESA_RUNTIME_LOGV3(info->log_runtime, RLOG_LV_FATAL, "%s: [%s]judian_forward_ip_list is empty!", config_file, judian_name);
+ continue;
+ }
+ init_forward_group_nodes(group, judian_name, node_iplist, ipnum, info);
+ free(node_iplist);
+ }
+ return 0;
+}
+
+int WYSF_STREAM_PKT_INIT(void)
+{
+ char root_log_name[256];
+ pthread_t thread_desc;
+ pthread_attr_t attr;
+ int log_level;
+ SAPP_TLV_T tlv_value;
+
+ memset(&g_wysf_global_info, 0, sizeof(struct wysf_global_info));
+ g_wysf_global_info.group_judian = new std::map<std::string, struct judian_as_group*>;
+ g_wysf_global_info.func_keepalive = new std::map<int32_t, struct function_node*>;
+ g_wysf_global_info.forwardip2judian = new std::map<int32_t, struct judian_as_group*>;
+
+ MESA_load_profile_string_def(WYSF_CONFIG_FILE, "MODULE", "RUN_LOG_NAME", root_log_name, sizeof(root_log_name), "./log/wysf_runtime.log");
+ MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "RUN_LOG_LV", &log_level, 10);
+ g_wysf_global_info.log_runtime = MESA_create_runtime_log_handle(root_log_name, log_level);
+ if(NULL == g_wysf_global_info.log_runtime)
+ {
+ printf("MESA_create_runtime_log_handle %s failed: %s\n", root_log_name, strerror(errno));
+ return -1;
+ }
+ if(load_config_judian_function_iplist(WYSF_CONFIG_FILE, &g_wysf_global_info))
+ {
+ return -2;
+ }
+
+ MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "broadcast_receive_port", &g_wysf_global_info.broadcast_udp_server_port, 6789);
+
+ MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "bfd_dest_port", &g_wysf_global_info.bfd_dest_port, 8193);
+ MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "bfd_timeout_ms", &g_wysf_global_info.bfd_timeout_ms, 20);
+ MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "bfd_timeout_times", &g_wysf_global_info.bfd_timeout_times, 3);
+ MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "htable_flow_slots", &g_wysf_global_info.flowhtable_slots, 16777216);
+ MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "htable_flow_expires", &g_wysf_global_info.flowhtable_expires, 120);
+ MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "htable_locks_num", &g_wysf_global_info.htable_locknum, 67);
+
+ MESA_load_profile_string_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_APPNAME", g_wysf_global_info.fsstat_appname, 16, "WYSF");
+ MESA_load_profile_string_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_FILEPATH", g_wysf_global_info.fsstat_filepath, 256, "./log/wysf.fs");
+ MESA_load_profile_uint_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_INTERVAL", &g_wysf_global_info.fsstat_period, 5);
+ MESA_load_profile_uint_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_PRINT_MODE", &g_wysf_global_info.fsstat_print_mode, 1);
+ MESA_load_profile_string_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_DST_IP", g_wysf_global_info.fsstat_dst_ip, 64, "127.0.0.1");
+ MESA_load_profile_int_def(WYSF_CONFIG_FILE, "MODULE", "FSSTAT_LOG_DST_PORT", &g_wysf_global_info.fsstat_dst_port, 8125);
+ wysf_register_field_stat(&g_wysf_global_info);
+
+ g_wysf_global_info.stream_tuple_mapping = init_and_create_htable(g_wysf_global_info.flowhtable_slots,
+ g_wysf_global_info.flowhtable_expires, g_wysf_global_info.htable_locknum, NULL, NULL);
+
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+ if(pthread_create(&thread_desc, &attr, thread_funcnode_keepalive, NULL))
+ {
+ MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_FATAL, "pthread_create(): %s", strerror(errno));
+ assert(0);return -5;
+ }
+ if(pthread_create(&thread_desc, &attr, thread_fwdnode_breoadcast, NULL))
+ {
+ MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_FATAL, "pthread_create(): %s", strerror(errno));
+ assert(0);return -5;
+ }
+
+ memset(&tlv_value, 0, sizeof(SAPP_TLV_T));
+ tlv_value.type = GDEV_KEEPALIVE_OPT_GLOBAL_SWITCH;
+ tlv_value.int_value = 0;
+ tlv_value.length = sizeof(int);
+ gdev_keepalive_set_opt(&tlv_value);
+ return 0;
+}
+
+void WYSF_STREAM_PKT_DESTROY(void)
+{
+}
+
diff --git a/src/wy_singleflow_entry.h b/src/wy_singleflow_entry.h
new file mode 100644
index 0000000..47c5961
--- /dev/null
+++ b/src/wy_singleflow_entry.h
@@ -0,0 +1,88 @@
+#ifndef __WYSF_PKT_ENTRY_H__
+#define __WYSF_PKT_ENTRY_H__
+
+#include <MESA/MESA_htable.h>
+#include <MESA/field_stat2.h>
+#include <MESA/MESA_handle_logger.h>
+
+#include <map>
+#include <string>
+
+#include <MESA/stream_inc/gdev_keepalive.h>
+#include "wy_singleflow_keepalive.h"
+#include "wy_singleflow_broadcast.h"
+
+#define WYSF_CONFIG_FILE "./conf/wy_single_flow.conf"
+
+#ifndef __FILENAME__
+#define __FILENAME__ __FILE__
+#endif
+#define MESA_RUNTIME_LOGV3(handle, lv, fmt, args...) \
+ MESA_handle_runtime_log((handle), (lv), "WYSF", "%s:%d, " fmt, __FILENAME__, __LINE__, ##args)
+
+#define WYSF_MAGIC_NUMBER 0x6E7A
+
+enum WYSF_FSSTAT_STATUS
+{
+ WYSF_FSSTAT_STATUS_FLOW_TABLE=0,
+ WYSF_FSSTAT_STATUS_ACTIVE_FUNC,
+ WYSF_FSSTAT_STATUS_KEEPALIVE,
+
+ WYSF_FSSTAT_STATUS_MAX,
+};
+
+enum WYSF_FSSTAT_FIELD
+{
+ WYSF_FSSTAT_FIELD_FLOWS_TOTAL=0,
+ WYSF_FSSTAT_FIELD_FLOWS_INTERCEPT,
+ WYSF_FSSTAT_FIELD_FLOWS_BACK,
+ WYSF_FSSTAT_FIELD_INTER_BYTES,
+ WYSF_FSSTAT_FIELD_SEND_BROADCAST,
+ WYSF_FSSTAT_FIELD_RECV_BROADCAST,
+
+ WYSF_FSSTAT_FIELD_MAX,
+};
+
+struct wysf_stream_context
+{
+ struct function_node *func_node;
+};
+
+struct wysf_global_info
+{
+ void *log_runtime;
+ MESA_htable_handle stream_tuple_mapping;
+ int32_t flowhtable_slots;
+ int32_t flowhtable_expires;
+ int32_t htable_locknum;
+ int32_t bfd_dest_port;
+ int32_t bfd_timeout_ms;
+ int32_t bfd_timeout_times;
+ struct event_base *alive_evbase;
+
+ int32_t alive_func_nodes;
+ int32_t func_group_num;
+ struct judian_as_group *judian_group_list[1024];
+ int32_t broadcast_udp_servert_sockfd;
+ int32_t broadcast_udp_server_port;
+
+ std::map<std::string, struct judian_as_group*> *group_judian;
+ std::map<int32_t, struct function_node*> *func_keepalive;
+ std::map<int32_t, struct judian_as_group*> *forwardip2judian;
+
+ screen_stat_handle_t fsstat_handle;
+ struct event fs_timer_output;
+ char fsstat_dst_ip[64];
+ char fsstat_appname[16];
+ char fsstat_filepath[256];
+ char fsstat_histlen[256];
+ u_int32_t fsstat_period;
+ u_int32_t fsstat_print_mode; //0-close; 1-automaticlly; 2-passively
+ int32_t fsstat_dst_port;
+ int32_t fsstat_histlen_id;
+ int32_t fsstat_field_ids[WYSF_FSSTAT_FIELD_MAX];
+ int32_t fsstat_status_ids[WYSF_FSSTAT_STATUS_MAX];
+};
+
+#endif
+
diff --git a/src/wy_singleflow_keepalive.cpp b/src/wy_singleflow_keepalive.cpp
new file mode 100644
index 0000000..2c48705
--- /dev/null
+++ b/src/wy_singleflow_keepalive.cpp
@@ -0,0 +1,237 @@
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <errno.h>
+#include <pthread.h>
+#include <string.h>
+#include <sys/ioctl.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <arpa/inet.h>
+#include <sys/prctl.h>
+
+#include "wy_singleflow_entry.h"
+#include "wy_singleflow_keepalive.h"
+
+#define DEFAULT_HOST_CAPACITY 4
+#define LOAD_BALANC_VIRT_TIMES 16
+
+extern struct wysf_global_info g_wysf_global_info;
+
+
+static void conhash_delay_destroy_timer_cb(int fd, short kind, void *userp)
+{
+ struct time_event *delay_event=(struct time_event *)userp;
+
+ conhash_instance_free(delay_event->conhash);
+ free(delay_event);
+}
+
+static void load_balance_common_timer_start(struct event *time_event)
+{
+ struct timeval tv;
+
+ tv.tv_sec = 2;
+ tv.tv_usec = 0;
+ evtimer_add(time_event, &tv);
+}
+
+static void conhash_handle_delay_destroy(struct event_base *evbase, struct consistent_hash *conhash)
+{
+ struct time_event *delay_event;
+
+ delay_event = (struct time_event *)malloc(sizeof(struct time_event));
+ delay_event->conhash = conhash;
+ evtimer_assign(&delay_event->timer_event, evbase, conhash_delay_destroy_timer_cb, delay_event);
+ load_balance_common_timer_start(&delay_event->timer_event);
+}
+
+static void conhash_insert_dest_host(struct judian_as_group *func_group, int32_t bucketid)
+{
+ struct conhash_bucket bucket;
+ struct consistent_hash *tmphash, *newhash=NULL;
+ enum CONHASH_ERRCODE code;
+
+ bucket.bucket_id = bucketid;
+ bucket.point_num = DEFAULT_HOST_CAPACITY * LOAD_BALANC_VIRT_TIMES;;
+ bucket.tag = NULL;
+
+ newhash = conhash_instance_copy(func_group->conhash);
+ code = conhash_insert_bucket(newhash, &bucket);
+ assert(code == CONHASH_OK);
+
+ tmphash = func_group->conhash;
+ func_group->conhash = newhash;
+ conhash_handle_delay_destroy(g_wysf_global_info.alive_evbase , tmphash);
+}
+
+static void conhash_remove_dest_host(struct judian_as_group *func_group, int32_t bucketid)
+{
+ struct consistent_hash *tmphash, *newhash=NULL;
+ enum CONHASH_ERRCODE code;
+
+ newhash = conhash_instance_copy(func_group->conhash);
+ code = conhash_remove_bucket(newhash, bucketid, NULL);
+ assert(code == CONHASH_OK || code==CONHASH_BUCKET_NOT_FOUND);
+
+ tmphash = func_group->conhash;
+ func_group->conhash = newhash;
+ conhash_handle_delay_destroy(g_wysf_global_info.alive_evbase, tmphash);
+}
+
+static int udp_keepalive_client_socket_init(void)
+{
+ int on = 1, socket_fd;
+
+ if((socket_fd = socket(AF_INET,SOCK_DGRAM,0))<0)
+ {
+ return -1;
+ }
+ on = 5242880;
+ setsockopt(socket_fd, SOL_SOCKET, SO_SNDBUF, &on, sizeof(on));
+ evutil_make_socket_nonblocking(socket_fd);
+ return socket_fd;
+}
+
+void func_client_read_bfdres_cb(evutil_socket_t fd, short events, void *arg)
+{
+ struct function_node *nodes=(struct function_node *)arg;
+ struct sockaddr_in saddr;
+ char buffer[2048];
+ int from_len, buf_len;
+ SAPP_TLV_T tlv_value;
+
+ from_len = sizeof (struct sockaddr);
+ while((buf_len = recvfrom (fd, buffer, 65536, 0, (struct sockaddr*)&saddr, (socklen_t *)&from_len))>0)
+ {
+ //BFD packet validate
+ nodes->retry_times = 0;
+ if(!nodes->conhash_inserted)
+ {
+ conhash_insert_dest_host(nodes->parent, nodes->ip_as_bucketid);
+ nodes->conhash_inserted = 1;
+ if(g_wysf_global_info.alive_func_nodes == 0)
+ {
+ memset(&tlv_value, 0, sizeof(SAPP_TLV_T));
+ tlv_value.type = GDEV_KEEPALIVE_OPT_GLOBAL_SWITCH;
+ tlv_value.int_value = 1;
+ tlv_value.length = sizeof(int);
+ gdev_keepalive_set_opt(&tlv_value);
+ }
+ g_wysf_global_info.alive_func_nodes += 1;
+ MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_DEBUG, "keepalive ip %s inserted", nodes->dst_ip_str);
+ }
+ from_len = sizeof (struct sockaddr);
+ }
+}
+
+void func_client_send_bfd_packet(struct function_node *nodes, const char *data, int datalen, struct sockaddr *dstaddr)
+{
+ nodes->retry_times++;
+
+ if(datalen != sendto(nodes->udp_sockfd, data, datalen, 0, (struct sockaddr*)&nodes->sinaddr, sizeof(struct sockaddr)))
+ {
+ MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_DEBUG, "sendto ip %s failed: %s", nodes->dst_ip_str, strerror(errno));
+ }
+}
+
+static void func_client_alive_timer_cb(int fd, short kind, void *userp)
+{
+ struct function_node *nodes=(struct function_node *)userp;
+ struct timeval tv;
+ SAPP_TLV_T tlv_value;
+
+ func_client_send_bfd_packet(nodes, NULL, 0, (struct sockaddr *)&nodes->sinaddr); //todo
+
+ if(nodes->conhash_inserted && nodes->retry_times >= g_wysf_global_info.bfd_timeout_times)
+ {
+ if(--g_wysf_global_info.alive_func_nodes == 0)
+ {
+ memset(&tlv_value, 0, sizeof(SAPP_TLV_T));
+ tlv_value.type = GDEV_KEEPALIVE_OPT_GLOBAL_SWITCH;
+ tlv_value.int_value = 0;
+ tlv_value.length = sizeof(int);
+ gdev_keepalive_set_opt(&tlv_value);
+ }
+ conhash_remove_dest_host(nodes->parent, nodes->ip_as_bucketid);
+ nodes->conhash_inserted = 0;
+ MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_DEBUG, "keepalive ip %s removed", nodes->dst_ip_str);
+ }
+
+ tv.tv_sec = 0;
+ tv.tv_usec = g_wysf_global_info.bfd_timeout_ms * 1000;
+ evtimer_add(&nodes->alive_detect_timer, &tv);
+}
+
+static void finger_fs_output_timer_cb(int fd, short kind, void *userp)
+{
+ struct timeval tv;
+ u_int32_t bucket_num=0;
+ SAPP_TLV_T tlv_value;
+
+ for(int32_t i=0; i<g_wysf_global_info.func_group_num; i++)
+ {
+ bucket_num += conhash_get_bucket_num(g_wysf_global_info.judian_group_list[i]->conhash);
+ }
+ FS_operate(g_wysf_global_info.fsstat_handle, g_wysf_global_info.fsstat_status_ids[WYSF_FSSTAT_STATUS_ACTIVE_FUNC], 0, FS_OP_SET, bucket_num);
+
+ bucket_num = MESA_htable_get_elem_num(g_wysf_global_info.stream_tuple_mapping);
+ FS_operate(g_wysf_global_info.fsstat_handle, g_wysf_global_info.fsstat_status_ids[WYSF_FSSTAT_STATUS_FLOW_TABLE], 0, FS_OP_SET, bucket_num);
+
+ memset(&tlv_value, 0, sizeof(SAPP_TLV_T));
+ tlv_value.type = GDEV_KEEPALIVE_OPT_GLOBAL_SWITCH;
+ tlv_value.length = sizeof(int);
+ gdev_keepalive_get_opt(&tlv_value);
+ FS_operate(g_wysf_global_info.fsstat_handle, g_wysf_global_info.fsstat_status_ids[WYSF_FSSTAT_STATUS_KEEPALIVE], 0, FS_OP_SET, tlv_value.int_value);
+
+ FS_passive_output(g_wysf_global_info.fsstat_handle);
+ tv.tv_sec = g_wysf_global_info.fsstat_period;
+ tv.tv_usec = 0;
+ evtimer_add(&g_wysf_global_info.fs_timer_output, &tv);
+}
+
+void *thread_funcnode_keepalive(void *arg)
+{
+ struct event_base *evbase;
+ struct judian_as_group *func_group;
+ struct timeval tv;
+
+ prctl(PR_SET_NAME, "wysf_alive");
+
+ g_wysf_global_info.alive_evbase = evbase = event_base_new();
+
+ for(int i=0; i<g_wysf_global_info.func_group_num; i++)
+ {
+ func_group = g_wysf_global_info.judian_group_list[i];
+ for(int j=0; j<func_group->func_nodes_num; j++)
+ {
+ func_group->func_nodes[i].udp_sockfd = udp_keepalive_client_socket_init();
+ event_assign(&func_group->func_nodes[i].msgevent, evbase, func_group->func_nodes[i].udp_sockfd, EV_READ|EV_PERSIST,
+ func_client_read_bfdres_cb, &func_group->func_nodes[i]);
+ event_add(&func_group->func_nodes[i].msgevent, NULL);
+
+ func_client_send_bfd_packet(&func_group->func_nodes[i], NULL, 0, (struct sockaddr *)&func_group->func_nodes[i].sinaddr); //todo
+
+ tv.tv_sec = 0;
+ tv.tv_usec = g_wysf_global_info.bfd_timeout_ms * 1000;
+ evtimer_assign(&func_group->func_nodes[i].alive_detect_timer, evbase, func_client_alive_timer_cb, &func_group->func_nodes[i]);
+ evtimer_add(&func_group->func_nodes[i].alive_detect_timer, &tv);
+ }
+ }
+
+ evtimer_assign(&g_wysf_global_info.fs_timer_output, evbase, finger_fs_output_timer_cb, NULL);
+ tv.tv_sec = g_wysf_global_info.fsstat_period;
+ tv.tv_usec = 0;
+ evtimer_add(&g_wysf_global_info.fs_timer_output, &tv);
+
+ MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_FATAL, "keepalive thread started.");
+ event_base_dispatch(evbase);
+ printf("Libevent dispath error, should not run here.\n");
+ MESA_RUNTIME_LOGV3(g_wysf_global_info.log_runtime, RLOG_LV_FATAL, "Libevent dispath error, should not run here.");
+ assert(0);return NULL;
+}
+
diff --git a/src/wy_singleflow_keepalive.h b/src/wy_singleflow_keepalive.h
new file mode 100644
index 0000000..00937a0
--- /dev/null
+++ b/src/wy_singleflow_keepalive.h
@@ -0,0 +1,53 @@
+#ifndef __WY_SINGLE_FLOW_KEEPALIVE_H__
+#define __WY_SINGLE_FLOW_KEEPALIVE_H__
+
+#include <event.h>
+
+#include "nirvana_conhash.h"
+
+struct time_event
+{
+ struct event timer_event;
+ struct consistent_hash *conhash;
+};
+
+struct function_node
+{
+ char *dst_ip_str;
+ int32_t udp_sockfd;
+ int32_t ip_as_bucketid;
+ int32_t conhash_inserted;
+ int32_t retry_times;
+ struct sockaddr_in sinaddr;
+
+ struct event msgevent;
+ struct event alive_detect_timer;
+ struct event send_bfd_timer;
+
+ struct judian_as_group *parent;
+};
+
+struct forward_nodes
+{
+ char *dst_ip_str;
+ int32_t udp_sockfd;
+ int32_t dst_ip;
+ struct sockaddr_in sinaddr;
+ struct judian_as_group *parent;
+};
+
+struct judian_as_group
+{
+ char groupname[64];
+ struct function_node *func_nodes;
+ struct forward_nodes *fwd_nodes;
+ int32_t func_nodes_num;
+ int32_t fwd_nodes_num;
+ struct consistent_hash *conhash;
+};
+
+
+void *thread_funcnode_keepalive(void *arg);
+
+#endif
+