diff options
| -rw-r--r-- | CRDT/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | CRDT/ap_bloom.c | 555 | ||||
| -rw-r--r-- | CRDT/ap_bloom.h | 8 | ||||
| -rw-r--r-- | CRDT/ap_bloom_gtest.cpp | 69 | ||||
| -rw-r--r-- | CRDT/crdt_utils.c | 11 | ||||
| -rw-r--r-- | CRDT/crdt_utils.h | 29 |
6 files changed, 466 insertions, 208 deletions
diff --git a/CRDT/CMakeLists.txt b/CRDT/CMakeLists.txt index f636e8c..3b00795 100644 --- a/CRDT/CMakeLists.txt +++ b/CRDT/CMakeLists.txt @@ -28,4 +28,4 @@ add_executable(ap_bloom_gtest ap_bloom_gtest.cpp ${PROJECT_SOURCE_DIR}/CRDT/ap_bloom.c ${PROJECT_SOURCE_DIR}/deps/xxhash/xxhash.c ${PROJECT_SOURCE_DIR}/CRDT/crdt_utils.c) -target_link_libraries(ap_bloom_gtest gtest-static pthread fieldstat4)
\ No newline at end of file +target_link_libraries(ap_bloom_gtest gtest-static pthread fieldstat4) diff --git a/CRDT/ap_bloom.c b/CRDT/ap_bloom.c index 51014e9..fe4eee4 100644 --- a/CRDT/ap_bloom.c +++ b/CRDT/ap_bloom.c @@ -2,7 +2,7 @@ #include "crdt_utils.h" #include "utlist.h" #include "utarray.h" - +#include "xxhash.h" #include <math.h> // log, ceil #include <stdio.h> // printf #include <assert.h> // assert @@ -11,62 +11,255 @@ #include <unistd.h> // write #include <stdint.h> //uint64_t #include <stdbool.h> + +#define APBM_USE_MEMPOOL 1 //use mempool to reduce page-faults +#if APBM_USE_MEMPOOL +#define APMOOL_ALLOC(mp, type, number) ((type *)apbm_mempool_alloc(mp, sizeof(type)*(number))) +#define APMOOL_FREE(mp, p) {apbm_mempool_free(mp, p); p=NULL;} +#else +#define APMOOL_ALLOC(mp, type, number) ALLOC(type, number) +#define APMOOL_FREE(mp, p) FREE(&p) +#endif + +#define USE_HUGE_PAGE (0) //precondition : sysctl -w vm.nr_hugepages=128,256,512... or other number by your needs +#define MEM_POOL_INIT_BLOCK_NUM (24) +#define MEM_ALIGN_SIZE() (getpagesize() > 4096 ? 4096: getpagesize()) + +#if APBM_USE_MEMPOOL +struct apbm_block_info{ + void *block_memory_head; + void *block_memory_tail; + void **block_heads; + int *block_inused_flags; + unsigned int block_size; + unsigned int block_num; + int ues_hugepage; + int to_flush_block_num; +}; +struct apbm_mempool{ + struct apbm_block_info large; + struct apbm_block_info small; +}; +enum mempool_flag{ + MEMPOOL_USABLE = 0, + MEMPOOL_INUSED = 1, +}; + +static int apbm_mempool_block_init(struct apbm_block_info *block, size_t block_size, int block_num) +{ + block->block_size = block_size; + block->block_num = block_num; +#if USE_HUGE_PAGE + block->block_memory_head = mmap(NULL, block_num * block_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS | MAP_HUGETLB, -1, 0); + if(MAP_FAILED == block->block_memory_head){ + perror("dabm_mempool_init() mmap huge-page error, use glibc: "); + posix_memalign((void **)&block->block_memory_head, 64, block->block_num * block->block_size); + block->ues_hugepage = 0; + }else{ + block->ues_hugepage = 1; + } +#else + posix_memalign((void **)&block->block_memory_head, MEM_ALIGN_SIZE(), block->block_num * block->block_size); +#endif + block->block_memory_tail = (char *)block->block_memory_head + block->block_num * block->block_size; + memset(block->block_memory_head, 0, block->block_num * block->block_size); + block->block_heads = (void **)calloc(block->block_num, sizeof(void *)); + block->block_inused_flags = (int *)calloc(block->block_num, sizeof(int)); + for(int i=0; i<block->block_num; i++) + { + block->block_heads[i] = (char *)block->block_memory_head + i * block->block_size; + block->block_inused_flags[i] = 0; + } + return 0; +} +static void apbm_mp_block_free(struct apbm_block_info *block, void *p) +{ + for (int i = 0; i < block->block_num; i++) + { + if((block->block_heads[i] == p) + && (block->block_inused_flags[i] == MEMPOOL_INUSED)){ + memset(p, 0, block->block_size); //memset in free() + block->block_inused_flags[i] = MEMPOOL_USABLE; + return; + } + } + assert(0); +} +static void apbm_mp_block_destroy(struct apbm_block_info *block) +{ + if(block->block_memory_head){ + if(block->ues_hugepage){ + munmap(block->block_memory_head, block->block_num * block->block_size); + }else{ + free(block->block_memory_head); + } + block->block_memory_head = NULL; + } + if(block->block_heads){ + free(block->block_heads); + block->block_heads = NULL; + } + if(block->block_inused_flags){ + free(block->block_inused_flags); + block->block_inused_flags = NULL; + } + block->block_num = 0; + block->block_size = 0; +} +static void *apbm_mp_block_alloc(struct apbm_block_info *block, int size) +{ + for (int i = 0; i < block->block_num; i++){ + if(block->block_inused_flags[i] == MEMPOOL_USABLE){ + block->block_inused_flags[i] = MEMPOOL_INUSED; + return block->block_heads[i]; + } + } + return NULL; +} + +void *apbm_mempool_alloc(struct apbm_mempool *mp, int size) +{ + void *res = NULL; + if(size <= mp->small.block_size){ + res = apbm_mp_block_alloc(&mp->small, size); + }else if(size <= mp->large.block_size){ + res = apbm_mp_block_alloc(&mp->large, size); + } + if(NULL == res){ + //mempool full or size unmatch, alloc use glibc + res = calloc(size, sizeof(char)); + } + return res; +} +void apbm_mempool_free(struct apbm_mempool *mp, void *p) +{ + if((unsigned long long)p >= (unsigned long long)mp->small.block_memory_head + && (unsigned long long)p <= (unsigned long long)mp->small.block_memory_tail){ + return apbm_mp_block_free(&mp->small, p); + } + if((unsigned long long)p >= (unsigned long long)mp->large.block_memory_head + && (unsigned long long)p <= (unsigned long long)mp->large.block_memory_tail){ + return apbm_mp_block_free(&mp->large, p); + } + //not allocated from mempool + free(p); +} +struct apbm_mempool *apbm_mempool_new(size_t large_block_size, int large_block_num, size_t small_block_size, int small_block_num) +{ + struct apbm_mempool *mp = (struct apbm_mempool *)calloc(1, sizeof(struct apbm_mempool)); + memset(mp, 0, sizeof(struct apbm_mempool)); + apbm_mempool_block_init(&mp->large, large_block_size, large_block_num); + apbm_mempool_block_init(&mp->small, small_block_size, small_block_num); + return mp; +} +void apbm_mempool_destroy(struct apbm_mempool *mp) +{ + apbm_mp_block_destroy(&mp->large); + apbm_mp_block_destroy(&mp->small); + free(mp); +} +#endif + + /* source * https://github.com/RedisBloom/RedisBloom/blob/AgePartitionedBF/contrib/agingBloom.c */ #define SLICE_EXPANSION 2 -#define FILL_RATIO_THRESHOLD 0.1 +#define FILL_RATIO_THRESHOLD 0.5 #define MIN_CAPACITY 1000 - //The test_bit_set_bit() code snap is from RedisBloom (BSD License). //Source https://github.com/RedisBloom/RedisBloom/blob/master/deps/bloom/bloom.c #define MODE_READ 0 #define MODE_WRITE 1 -//return 1 if the bit is already set, 0 if it was not set -inline static int test_bit_set_bit(unsigned char *buf, uint64_t x, int mode) { - uint64_t byte = x >> 3; - uint8_t mask = 1 << (x % 8); - uint8_t c = buf[byte]; // expensive memory access +struct AP_bloom_stat{ + size_t slice_new_num_acc; + size_t slice_free_num_acc; + size_t expand_num_acc; + size_t expand_max_multiple_rt; + size_t slice_num_rt; +}; +struct AP_configuration +{ + double error; + long long capacity; + long long time_window_ms; + long long time_slice_num; + struct timeval last_cfg; +}; +struct AP_bloom +{ + //high level variables determined by caller + struct AP_configuration cfg; + //low level variables caculated by the call configure + int hash_num; //k + int chain_num; //hash_num+time_slice_num + int default_slice_size; // in bytes + + //runtime variables + int cursor; + struct timeval last_slide; + struct ap_slice **heads; //hash_num+timeframe_num + + struct AP_bloom_stat stat; + struct apbm_mempool *mp; +}; + +//return 1 if the bit is already set, 0 if it was not set +inline static int set_bit(unsigned char *buf, uint32_t bit_index, uint32_t byte_index) { + uint8_t mask = 1 << (bit_index % 8); + uint8_t c = buf[byte_index]; // expensive memory access, maybe trigger page-fault events if (c & mask) { return 1; } else { - if (mode == MODE_WRITE) { - buf[byte] = c | mask; - } + buf[byte_index] = c | mask; return 0; } } +inline static int test_bit(const unsigned char *buf, uint32_t bit_index, uint32_t byte_index) { + uint8_t mask = 1 << (bit_index % 8); + uint8_t c = buf[byte_index]; // expensive memory access, maybe trigger page-fault events + return c & mask; +} + struct ap_slice { - int hash_index; + unsigned int hash_index; int popcount; //the number of 1-bits in the slice - int slice_size; //in bytes + unsigned int slice_size; //in bytes struct timeval last_insert; struct timeval first_insert; unsigned char * data; struct ap_slice * next; }; -static struct ap_slice *ap_slice_new(int slice_size, int hash_index) +static struct ap_slice *ap_slice_new(struct AP_bloom *bloom, int slice_size, int hash_index) { - struct ap_slice *slice = ALLOC(struct ap_slice, 1); + struct ap_slice *slice = APMOOL_ALLOC(bloom->mp, struct ap_slice, 1); slice->slice_size = slice_size; assert(slice_size % sizeof(unsigned long long) == 0); //ap_slice_chain_deserialize may alloc slice_size=0 - if(slice_size > 0) slice->data = ALLOC(unsigned char, slice_size); + if(slice_size > 0){ + slice->data = APMOOL_ALLOC(bloom->mp, unsigned char, slice_size); + memset(slice->data, 0, slice_size); + } slice->hash_index = hash_index; slice->popcount = 0; + + bloom->stat.slice_new_num_acc++; + bloom->stat.slice_num_rt++; return slice; } -static void ap_slice_free(struct ap_slice *slice) +static void ap_slice_free(struct AP_bloom *bloom, struct ap_slice *slice) { - free(slice->data); - slice->data = NULL; - free(slice); + APMOOL_FREE(bloom->mp, slice->data); + // slice->data = NULL; + APMOOL_FREE(bloom->mp, slice); + bloom->stat.slice_free_num_acc++; + bloom->stat.slice_num_rt--; } void ap_slice_chain_info(const struct ap_slice *head, double *fill_ratio, int *slice_num) { @@ -105,134 +298,53 @@ static int ap_slice_cmp(const struct ap_slice *a, const struct ap_slice *b) //bigger size first return b->slice_size - a->slice_size; } -static struct ap_slice * ap_slice_duplicate(const struct ap_slice *slice) +static struct ap_slice * ap_slice_duplicate(struct AP_bloom *bloom, const struct ap_slice *slice) { - struct ap_slice *new_slice = ap_slice_new(slice->slice_size, slice->hash_index); + struct ap_slice *new_slice = ap_slice_new(bloom, slice->slice_size, slice->hash_index); memcpy(new_slice, slice, offsetof(struct ap_slice, data)); memcpy(new_slice->data, slice->data, slice->slice_size); return new_slice; } #define HASH_TO_OFFSET(hash, slice) ((hash.a + slice->hash_index * hash.b) % (slice->slice_size<<3)) -void ap_slice_add_hash(struct ap_slice *slice, struct double_hash hash, struct timeval now) +void ap_slice_add_hash(struct ap_slice *slice, const struct double_hash *hash, struct timeval now) { - int index=double_hash_generate(&hash, slice->hash_index, slice->slice_size<<3); - int added = !test_bit_set_bit(slice->data, index, MODE_WRITE); + unsigned int bit_index=double_hash_generate(hash, slice->hash_index, slice->slice_size<<3); + unsigned int byte_index = bit_index>>3; + __builtin_prefetch(&slice->data[byte_index], 1); + int added = !set_bit(slice->data, bit_index, byte_index); + slice->popcount += added; slice->last_insert = now; - if(slice->popcount == 0) + if(unlikely(slice->popcount == 0)) { slice->first_insert = now; } - slice->popcount += added; return; } -int ap_slice_check_hash(const struct ap_slice *slice, struct double_hash hash) -{ - int index=double_hash_generate(&hash, slice->hash_index, slice->slice_size<<3); - return test_bit_set_bit(slice->data, index, MODE_READ); -} -struct ap_slice_event -{ - struct timeval timestamp; - int slice_size; - int hash_index; - int event;//1: start, -1: end -}; -int slice_event_cmp(const void *a, const void *b) -{ - const struct ap_slice_event *ea = (struct ap_slice_event*)a; - const struct ap_slice_event *eb = (struct ap_slice_event*)b; - if(timercmp(&ea->timestamp, &eb->timestamp, <)) - { - return -1; - } - if(timercmp(&ea->timestamp, &eb->timestamp, >)) - { - return 1; - } - return eb->event - ea->event; -} -UT_icd slice_event_icd = {sizeof(struct ap_slice_event), NULL, NULL, NULL}; -struct ap_state -{ - int consecutive_matches; - int visited_slices; - int hash_num; - UT_array slice_time_events; -}; -void ap_state_init(struct ap_state *state, int hash_num) -{ - state->consecutive_matches=0; - state->visited_slices=0; - state->hash_num=hash_num; - utarray_init(&state->slice_time_events, &slice_event_icd); - utarray_reserve(&state->slice_time_events, hash_num*2); -} -void ap_state_clear(struct ap_state *state) +int ap_slice_check_hash(const struct ap_slice *slice, const struct double_hash *hash) { - state->consecutive_matches=0; - state->visited_slices=0; - utarray_clear(&state->slice_time_events); -} -void ap_state_done(struct ap_state *state) -{ - utarray_done(&state->slice_time_events); -} -int ap_state_is_match(struct ap_state *state) -{ - int counter=0; - struct ap_slice_event *ev=NULL; - if(state->consecutive_matches >= state->hash_num) - { - if(state->visited_slices==state->consecutive_matches)//fastpath for no slice expansion - { - return 1; - } - utarray_sort(&state->slice_time_events, slice_event_cmp); - while((ev=(struct ap_slice_event*)utarray_next(&state->slice_time_events, ev))) - { - counter += ev->event; - if(counter == state->hash_num) - { - return 1; - } - } - } - return 0; + unsigned int bit_index=double_hash_generate(hash, slice->hash_index, slice->slice_size<<3); + unsigned int byte_index = bit_index>>3; + __builtin_prefetch(&slice->data[byte_index], 0); + return test_bit(slice->data, bit_index, byte_index); } -static void ap_slice_chain_check_hash(const struct ap_slice *head, struct double_hash hash, struct ap_state *state) + +static int ap_slice_chain_check_hash(const struct ap_slice *head, const struct double_hash *hash) { + TIME_RECORD(); //In a stackable (scalable) Bloom filter, checking for membership now involves inspecting each layer for presence. const struct ap_slice *slice=NULL; - struct ap_slice_event ev; int found=0; LL_FOREACH(head, slice) { if(ap_slice_check_hash(slice, hash)) { - ev.timestamp = slice->first_insert; - ev.event = 1; - ev.slice_size=slice->slice_size; - ev.hash_index=slice->hash_index; - utarray_push_back(&state->slice_time_events, &ev); - ev.timestamp = slice->last_insert; - ev.event = -1; - ev.slice_size = slice->slice_size; - ev.hash_index = slice->hash_index; - utarray_push_back(&state->slice_time_events, &ev); found=1; + break; //quick path } - state->visited_slices++; - } - if(found) - { - state->consecutive_matches++; - } - else - { - ap_state_clear(state); } - return; + TIME_DIFF(); + return found; } /* Return: @@ -240,13 +352,17 @@ static void ap_slice_chain_check_hash(const struct ap_slice *head, struct double * 0 - element was not present and was added * 1 - element (or a collision) had already been added previously */ -static void ap_slice_chain_add_hash(struct ap_slice **head, struct double_hash hash, struct timeval now) +static void ap_slice_chain_add_hash(struct AP_bloom *bloom, struct ap_slice **head, const struct double_hash *hash, struct timeval now) { //add new slice if the current slice is full - if( (double) (*head)->popcount / ((*head)->slice_size<<3) > FILL_RATIO_THRESHOLD) + if(unlikely( (double) (*head)->popcount / ((*head)->slice_size<<3) > FILL_RATIO_THRESHOLD)) { - struct ap_slice *new_slice = ap_slice_new((*head)->slice_size * SLICE_EXPANSION, (*head)->hash_index); + struct ap_slice *new_slice = ap_slice_new(bloom, (*head)->slice_size * SLICE_EXPANSION, (*head)->hash_index); LL_PREPEND((*head), new_slice); + bloom->stat.expand_num_acc++; + if(new_slice->slice_size/bloom->default_slice_size > bloom->stat.expand_max_multiple_rt){ + bloom->stat.expand_max_multiple_rt = new_slice->slice_size/bloom->default_slice_size; + } } //Add it to the current (head) slice. ap_slice_add_hash(*head, hash, now); @@ -265,22 +381,22 @@ void ap_slice_chain_first_insert_time(const struct ap_slice *head, struct timeva } return; } -static void ap_slice_chain_free(struct ap_slice *head) +static void ap_slice_chain_free(struct AP_bloom *bloom, struct ap_slice *head) { struct ap_slice *slice=NULL, *tmp=NULL; LL_FOREACH_SAFE(head, slice, tmp) { LL_DELETE(head, slice); - ap_slice_free(slice); + ap_slice_free(bloom, slice); } } -static struct ap_slice *ap_slice_chain_duplicate(const struct ap_slice *src) +static struct ap_slice *ap_slice_chain_duplicate(struct AP_bloom *bloom, const struct ap_slice *src) { struct ap_slice *new_head=NULL; const struct ap_slice *slice=NULL; LL_FOREACH(src, slice) { - struct ap_slice * new_slice=ap_slice_duplicate(slice); + struct ap_slice * new_slice=ap_slice_duplicate(bloom, slice); LL_APPEND(new_head, new_slice); } return new_head; @@ -296,6 +412,16 @@ static size_t ap_slice_chain_mem_size(const struct ap_slice *head) } return sz; } +static size_t ap_slice_chain_mem_blocks(const struct ap_slice *head) +{ + size_t blocks=0; + const struct ap_slice *slice=NULL; + LL_FOREACH(head, slice) + { + blocks += 2; + } + return blocks; +} struct slice_chain_header { int chain_sequence; @@ -341,14 +467,14 @@ static size_t ap_slice_chain_serialize(const struct ap_slice *head, int chain_se assert(offset<=buffer_sz); return offset; } -static struct ap_slice *ap_slice_chain_deserialize(const char *buffer, size_t buffer_sz) +static struct ap_slice *ap_slice_chain_deserialize(struct AP_bloom *bloom, const char *buffer, size_t buffer_sz) { struct slice_chain_header *header=(struct slice_chain_header*)buffer; size_t offset=sizeof(struct slice_chain_header); struct ap_slice *head=NULL; for(size_t i=0; i<header->slice_number; i++) { - struct ap_slice *slice = ap_slice_new(0, 0); + struct ap_slice *slice = ap_slice_new(bloom, 0, 0); memcpy(slice, buffer+offset, offsetof(struct ap_slice, data)); offset += offsetof(struct ap_slice, data); slice->data = (unsigned char*) malloc(slice->slice_size); @@ -362,7 +488,7 @@ static struct ap_slice *ap_slice_chain_deserialize(const char *buffer, size_t bu } return head; } -static void ap_slice_merge(struct ap_slice *dst, const struct ap_slice *src) +static void ap_slice_merge(struct AP_bloom *bloom, struct ap_slice *dst, const struct ap_slice *src) { assert(dst->hash_index == src->hash_index); assert(dst->slice_size == src->slice_size); @@ -393,7 +519,7 @@ static void ap_slice_merge(struct ap_slice *dst, const struct ap_slice *src) } return; } -static void ap_slice_chain_merge(struct ap_slice **dst_head, const struct ap_slice *src_head) +static void ap_slice_chain_merge(struct AP_bloom *bloom, struct ap_slice **dst_head, const struct ap_slice *src_head) { const struct ap_slice *src_slice=NULL; int merged=0; @@ -409,43 +535,20 @@ static void ap_slice_chain_merge(struct ap_slice **dst_head, const struct ap_sli { if(src_slice->slice_size == dst_slice->slice_size) { - ap_slice_merge(dst_slice, src_slice); + ap_slice_merge(bloom, dst_slice, src_slice); merged=1; break; } } if(!merged) { - struct ap_slice *new_slice = ap_slice_duplicate(src_slice); + struct ap_slice *new_slice = ap_slice_duplicate(bloom, src_slice); LL_APPEND(*dst_head, new_slice); } } LL_SORT(*dst_head, ap_slice_cmp); ap_slice_sanity(*dst_head); } -struct AP_configuration -{ - double error; - long long capacity; - long long time_window_ms; - long long time_slice_num; - struct timeval last_cfg; -}; -struct AP_bloom -{ - //high level variables determined by caller - struct AP_configuration cfg; - - //low level variables caculated by the call configure - int hash_num; //k - int chain_num; //hash_num+time_slice_num - int default_slice_size; // in bytes - - //runtime variables - int cursor; - struct timeval last_slide; - struct ap_slice **heads; //hash_num+timeframe_num -}; #define DBL_EPSILON 2.2204460492503131e-16 bool definitelyLessThan(float a, float b) { @@ -470,18 +573,26 @@ struct AP_bloom *AP_bloom_new(struct timeval now, double error_rate, long long c bloom->cfg.last_cfg = now; bloom->cfg.time_slice_num = time_slice_num; - bloom->last_slide = now; bloom->cursor = 0; bloom->default_slice_size = ceil(capacity/log(2)); bloom->default_slice_size = ceil((double)bloom->default_slice_size/64)*64/8; + if(bloom->default_slice_size % MEM_ALIGN_SIZE() != 0){ + bloom->default_slice_size = (bloom->default_slice_size / MEM_ALIGN_SIZE() + 1) * MEM_ALIGN_SIZE(); + } + assert(0 == bloom->default_slice_size % MEM_ALIGN_SIZE()); +#if APBM_USE_MEMPOOL + bloom->mp = apbm_mempool_new(bloom->default_slice_size, MEM_POOL_INIT_BLOCK_NUM, 128, MEM_POOL_INIT_BLOCK_NUM); +#endif bloom->chain_num = bloom->hash_num + bloom->cfg.time_slice_num; - bloom->heads = ALLOC(struct ap_slice*, bloom->chain_num); + bloom->heads = APMOOL_ALLOC(bloom->mp, struct ap_slice*, bloom->chain_num); for(int i=0; i<bloom->chain_num; i++) { - bloom->heads[i] = ap_slice_new(bloom->default_slice_size, i % bloom->chain_num); + bloom->heads[i] = ap_slice_new(bloom, bloom->default_slice_size, i % bloom->chain_num); } - + bloom->stat.expand_max_multiple_rt = 1; + bloom->stat.slice_num_rt = bloom->chain_num; + bloom->last_slide = now; return bloom; } @@ -490,23 +601,25 @@ static void slide_time(struct AP_bloom *bloom, struct timeval now) int chain_num = bloom->chain_num; long long slide_time_frame_us=bloom->cfg.time_slice_num?(bloom->cfg.time_window_ms*1000/bloom->cfg.time_slice_num):INT64_MAX; long long slide_us=timeval_delta_us(bloom->last_slide, now); - long long elapse_us=timeval_delta_us(bloom->cfg.last_cfg, now); - long long epoches=elapse_us/slide_time_frame_us; - if(slide_us < slide_time_frame_us) + if(likely(slide_us < slide_time_frame_us)) { return; } + TIME_RECORD(); + long long elapse_us=timeval_delta_us(bloom->cfg.last_cfg, now); + long long epoches=elapse_us/slide_time_frame_us; slide_us -= slide_us % slide_time_frame_us; int n_slide = slide_us/slide_time_frame_us; - + int reset_hash_index; if(n_slide < bloom->cfg.time_slice_num) { for(int i=0; i<n_slide; i++) { int reset_idx = (bloom->cursor + bloom->hash_num) % chain_num; - struct ap_slice *slice = ap_slice_new(bloom->default_slice_size, bloom->heads[reset_idx]->hash_index); - ap_slice_chain_free(bloom->heads[reset_idx]); + reset_hash_index = bloom->heads[reset_idx]->hash_index; + ap_slice_chain_free(bloom, bloom->heads[reset_idx]); + struct ap_slice *slice = ap_slice_new(bloom, bloom->default_slice_size, reset_hash_index); bloom->heads[reset_idx] = slice; bloom->cursor = (bloom->cursor + 1) % chain_num; } @@ -515,76 +628,90 @@ static void slide_time(struct AP_bloom *bloom, struct timeval now) { for(int i=0; i<chain_num; i++) { - struct ap_slice *new_slice=ap_slice_new(bloom->default_slice_size, bloom->heads[i]->hash_index); - ap_slice_chain_free(bloom->heads[i]); + reset_hash_index = bloom->heads[i]->hash_index; + ap_slice_chain_free(bloom, bloom->heads[i]); + struct ap_slice *new_slice=ap_slice_new(bloom, bloom->default_slice_size, reset_hash_index); bloom->heads[i] = new_slice; } bloom->cursor = (bloom->cursor + n_slide) % chain_num; } + struct timeval slide_time; slide_time.tv_sec = slide_us/1000/1000; slide_time.tv_usec = slide_us%(1000*1000); timeradd(&bloom->last_slide, &slide_time, &bloom->last_slide); assert(bloom->cursor == (epoches % chain_num) || bloom->cursor == (epoches % chain_num + 1) % chain_num); + TIME_DIFF(); return; } void AP_bloom_add(struct AP_bloom *bloom, struct timeval now, const char *buffer, int len) { - struct double_hash hash; - double_hash_init(&hash, buffer, len); - if(bloom->cfg.time_window_ms) + TIME_RECORD(); + struct double_hash hash; + double_hash_init(&hash, buffer, (size_t)len); + if(bloom->cfg.time_slice_num && bloom->cfg.time_window_ms) { slide_time(bloom, now); } for(int i=0; i<bloom->hash_num; i++) { int idx=(bloom->cursor+i) % (bloom->chain_num); - ap_slice_chain_add_hash(&(bloom->heads[idx]), hash, now); + ap_slice_chain_add_hash(bloom,&(bloom->heads[idx]), &hash, now); } + TIME_DIFF(); return; } int AP_bloom_check(const struct AP_bloom *bloom, struct timeval now, const char *buffer, int len) { - if(timeval_delta_ms(bloom->cfg.last_cfg, now) < 0) + if(unlikely(timeval_delta_ms(bloom->cfg.last_cfg, now) < 0)) { return 0; } + TIME_RECORD(); struct double_hash hash; - double_hash_init(&hash, buffer, len); + double_hash_init(&hash, buffer, (size_t)len); int chain_num = bloom->chain_num; - - struct ap_state state; - ap_state_init(&state, bloom->hash_num); - for(int i = 0; i < bloom->hash_num+chain_num; i++) + int match_num = 0, unmatch_num = 0; + for(int i = 0; i < chain_num; i++) { - long long delta_us = timeval_delta_us(bloom->heads[i%chain_num]->last_insert, now); - if(bloom->cfg.time_window_ms && delta_us > bloom->cfg.time_window_ms*1000) + long long delta_us = timeval_delta_us(bloom->heads[i]->last_insert, now); + if(unlikely((delta_us > bloom->cfg.time_window_ms*1000) + && bloom->cfg.time_window_ms)) { - ap_state_clear(&state); continue; } - ap_slice_chain_check_hash(bloom->heads[i%chain_num], hash, &state); - if(ap_state_is_match(&state)) - { - ap_state_done(&state); - return 1; + if(ap_slice_chain_check_hash(bloom->heads[i], &hash)){ + match_num++; + }else{ + unmatch_num++; + } + if(unmatch_num > (chain_num - bloom->hash_num)){ + return 0; //quick path for check false } } - ap_state_done(&state); + if(match_num >= bloom->hash_num) + { + TIME_DIFF(); + return 1; + } + TIME_DIFF(); return 0; } void AP_bloom_free(struct AP_bloom *bloom) { for(int i=0; i<bloom->chain_num; i++) { - ap_slice_chain_free(bloom->heads[i]); + ap_slice_chain_free(bloom, bloom->heads[i]); bloom->heads[i]=NULL; } - free(bloom->heads); - bloom->heads=NULL; + APMOOL_FREE(bloom->mp, bloom->heads); + +#if APBM_USE_MEMPOOL + apbm_mempool_destroy(bloom->mp); +#endif free(bloom); } size_t AP_bloom_mem_size(const struct AP_bloom *bloom) @@ -598,6 +725,17 @@ size_t AP_bloom_mem_size(const struct AP_bloom *bloom) } return sz; } +size_t AP_bloom_mem_blocks(const struct AP_bloom *bloom) +{ + size_t blocks=0; + blocks++; + blocks += bloom->chain_num; + for(int i=0; i<bloom->chain_num; i++) + { + blocks += ap_slice_chain_mem_blocks(bloom->heads[i]); + } + return blocks; +} size_t AP_bloom_serialize_size(const struct AP_bloom *bloom) { size_t sz=0; @@ -625,14 +763,14 @@ void AP_bloom_serialize(const struct AP_bloom *bloom, char **blob, size_t *blob_ } struct AP_bloom * AP_bloom_deserialize(const char *blob, size_t blob_sz) { - struct AP_bloom *bloom=ALLOC(struct AP_bloom, 1); + struct AP_bloom *bloom=ALLOC(struct AP_bloom, 1); size_t offset=0; memcpy(bloom, blob+offset, offsetof(struct AP_bloom, heads)); offset += offsetof(struct AP_bloom, heads); bloom->heads=ALLOC(struct ap_slice*, bloom->chain_num); for(int i=0; i<bloom->chain_num; i++) { - bloom->heads[i]=ap_slice_chain_deserialize(blob+offset, blob_sz-offset); + bloom->heads[i]=ap_slice_chain_deserialize(bloom, blob+offset, blob_sz-offset); offset += ap_slice_chain_serialize_size(bloom->heads[i]); } assert(offset==blob_sz); @@ -651,7 +789,7 @@ void AP_bloom_merge(struct AP_bloom *dst, const struct AP_bloom *src) { for(int i=0; i<dst->chain_num; i++) { - ap_slice_chain_free(dst->heads[i]); + ap_slice_chain_free(dst, dst->heads[i]); } memcpy(dst, src, offsetof(struct AP_bloom, heads)); @@ -659,7 +797,7 @@ void AP_bloom_merge(struct AP_bloom *dst, const struct AP_bloom *src) dst->heads = ALLOC(struct ap_slice*, dst->chain_num); for(int i=0; i<dst->chain_num; i++) { - dst->heads[i] = ap_slice_chain_duplicate(src->heads[i]); + dst->heads[i] = ap_slice_chain_duplicate(dst, src->heads[i]); } } return; @@ -680,7 +818,7 @@ void AP_bloom_merge(struct AP_bloom *dst, const struct AP_bloom *src) { continue; } - ap_slice_chain_merge(&(dst->heads[i]), src->heads[i]); + ap_slice_chain_merge(dst,&(dst->heads[i]), src->heads[i]); } return; } @@ -766,4 +904,29 @@ void AP_bloom_info(const struct AP_bloom *bloom, struct AP_bloom_info *info) info->approximate_item_num = approximate_item_num(bloom); info->oldest_item_time = oldest_item; return; +} + +size_t AP_bloom_stat_get_hash_num(const struct AP_bloom *bloom) +{ + return bloom->hash_num; +} +size_t AP_bloom_stat_get_new_num(const struct AP_bloom *bloom) +{ + return bloom->stat.slice_new_num_acc; +} +size_t AP_bloom_stat_get_free_num(const struct AP_bloom *bloom) +{ + return bloom->stat.slice_free_num_acc; +} +size_t AP_bloom_stat_get_expand_num(const struct AP_bloom *bloom) +{ + return bloom->stat.expand_num_acc; +} +size_t AP_bloom_stat_get_cur_slice_num(const struct AP_bloom *bloom) +{ + return bloom->stat.slice_num_rt; +} +size_t AP_bloom_stat_get_expand_max_multiple(const struct AP_bloom *bloom) +{ + return bloom->stat.expand_max_multiple_rt; }
\ No newline at end of file diff --git a/CRDT/ap_bloom.h b/CRDT/ap_bloom.h index c40b5b9..73c6502 100644 --- a/CRDT/ap_bloom.h +++ b/CRDT/ap_bloom.h @@ -73,6 +73,14 @@ struct AP_bloom *AP_bloom_replicate(uuid_t uuid, const char *blob, size_t blob_s size_t AP_bloom_serialized_size(const struct AP_bloom *bloom); size_t AP_bloom_mem_size(const struct AP_bloom *bloom); +size_t AP_bloom_stat_get_hash_num(const struct AP_bloom *bloom); +size_t AP_bloom_mem_blocks(const struct AP_bloom *bloom); +size_t AP_bloom_stat_get_new_num(const struct AP_bloom *bloom); +size_t AP_bloom_stat_get_free_num(const struct AP_bloom *bloom); +size_t AP_bloom_stat_get_expand_num(const struct AP_bloom *bloom); +size_t AP_bloom_stat_get_cur_slice_num(const struct AP_bloom *bloom); +size_t AP_bloom_stat_get_expand_max_multiple(const struct AP_bloom *bloom); + #ifdef __cplusplus } #endif
\ No newline at end of file diff --git a/CRDT/ap_bloom_gtest.cpp b/CRDT/ap_bloom_gtest.cpp index b0f10d9..0f105c4 100644 --- a/CRDT/ap_bloom_gtest.cpp +++ b/CRDT/ap_bloom_gtest.cpp @@ -18,7 +18,7 @@ static unsigned int BM_CAPACITY = 10000000; static double BM_ERROR_RATE = 0.000001d; static int BM_TIMEOUT = 10 * 1000; // ms -static int BM_SLICE_NUM = 0; +static int BM_SLICE_NUM = 3; static int MAX_ITEM_NUM = 10000000; static int ITEM_BATCH_NUM = 1000; #define TUPLE4_ADDR_LEN (12 + 1) // sizeof(tuple4) add begin char 'Y' or 'N' @@ -89,7 +89,9 @@ static void bm_add_item(struct AP_bloom *bloom_filter, char *tuple4_buf, unsigne } total_add_time += (unsigned long long)time_diff; - fieldstat_easy_histogram_record(fs4_instance, 0, fs4_add_metric_id, &FS4_HISGRAM_TAG, 1, time_diff); + if(fs4_instance){ + fieldstat_easy_histogram_record(fs4_instance, 0, fs4_add_metric_id, &FS4_HISGRAM_TAG, 1, time_diff); + } } static int bm_search_item(struct AP_bloom *bloom_filter, char *tuple4_buf, unsigned int index, int fs4_metric_id) @@ -111,7 +113,9 @@ static int bm_search_item(struct AP_bloom *bloom_filter, char *tuple4_buf, unsig } total_search_time += (unsigned long long)time_diff; - fieldstat_easy_histogram_record(fs4_instance, 0, fs4_metric_id, &FS4_HISGRAM_TAG, 1, time_diff); + if(fs4_instance){ + fieldstat_easy_histogram_record(fs4_instance, 0, fs4_metric_id, &FS4_HISGRAM_TAG, 1, time_diff); + } return ret; } @@ -310,6 +314,65 @@ TEST(apbloom, perf) EXPECT_EQ(0, ap_bloom_filter_test(g_user_define_args)); } +static int apbloom_merge_test_run(void) +{ + int ret = 0; + const int max_item_num = 1000000; + struct timeval current_time_tv; + gettimeofday(¤t_time_tv, NULL); + + struct AP_bloom *apbma = (struct AP_bloom *)AP_bloom_new(g_current_time_tv, BM_ERROR_RATE, BM_CAPACITY, 1000 * 999, BM_SLICE_NUM); + assert(apbma); + char tuple4_buf[1024] = {}; + + tuple4_buf[0] = 'A'; + for(int i = 0; i < max_item_num; i++){ + bm_add_item(apbma, tuple4_buf, i); + } + + struct AP_bloom *apbmb = (struct AP_bloom *)AP_bloom_new(g_current_time_tv, BM_ERROR_RATE, BM_CAPACITY, 1000 * 999, BM_SLICE_NUM); + assert(apbmb); + + tuple4_buf[0] = 'B'; + for(int i = 0; i < max_item_num; i++){ + bm_add_item(apbmb, tuple4_buf, i); + } + + tuple4_buf[0] = 'B'; + for(int i = 0; i < max_item_num; i++){ + ret = bm_search_item(apbma, tuple4_buf, i, -1); + if(ret > 0){ + fprintf(stderr, "found tuple4 with B apbma before merge!\n"); + return -1; + } + } + + AP_bloom_merge(apbma, apbmb); + + for(int i = 0; i < max_item_num; i++){ + tuple4_buf[0] = 'A'; + ret = bm_search_item(apbma, tuple4_buf, i, -1); + if(ret <= 0){ + fprintf(stderr, "not found tuple4 with A after merge, index:%d\n", i); + return -1; + } + tuple4_buf[0] = 'B'; + ret = bm_search_item(apbma, tuple4_buf, i, -1); + if(ret <= 0){ + fprintf(stderr, "not found tuple4 with B after merge, index:%d\n", i); + return -1; + } + } + AP_bloom_free(apbma); + AP_bloom_free(apbmb); + return 0; +} + +TEST(apbloom, merge) +{ + EXPECT_EQ(0, apbloom_merge_test_run()); +} + static const char *gtest_cla_short_options = "hLf:u:"; static const struct option gtest_cla_long_options[] = diff --git a/CRDT/crdt_utils.c b/CRDT/crdt_utils.c index c796081..88f823d 100644 --- a/CRDT/crdt_utils.c +++ b/CRDT/crdt_utils.c @@ -3,11 +3,10 @@ void double_hash_init(struct double_hash *rv, const void *buffer, int len) { - rv->a=XXH3_64bits_withSeed(buffer, len, 0x9747b28c); - rv->b=XXH3_64bits_withSeed(buffer, len, rv->a); + // rv->a=XXH3_64bits_withSeed(buffer, len, 0x9747b28c); + // rv->b=XXH3_64bits_withSeed(buffer, len, rv->a); + XXH128_hash_t xxh128b = XXH3_128bits_withSeed(buffer, len, 0x9747b28c); + rv->a = xxh128b.high64; + rv->b = xxh128b.low64; return; -} -int double_hash_generate(const struct double_hash *rv, int i, int m) -{ - return (rv->a + i * rv->b) % m; }
\ No newline at end of file diff --git a/CRDT/crdt_utils.h b/CRDT/crdt_utils.h index 951e44b..57c34b5 100644 --- a/CRDT/crdt_utils.h +++ b/CRDT/crdt_utils.h @@ -37,7 +37,7 @@ // Double hashing for fast hashing, // reference: Kirsch, Adam, and Michael Mitzenmacher. "Less hashing, same performance: Building a better bloom filter." -// Algorithms–ESA 2006: 14th Annual European Symposium, Zurich, Switzerland, September 11-13, 2006. Proceedings 14. Springer Berlin Heidelberg, 2006. +// AlgorithmsCESA 2006: 14th Annual European Symposium, Zurich, Switzerland, September 11-13, 2006. Proceedings 14. Springer Berlin Heidelberg, 2006. // https://www.eecs.harvard.edu/~michaelm/postscripts/rsa2008.pdf struct double_hash { @@ -45,4 +45,29 @@ struct double_hash uint64_t b; }; void double_hash_init(struct double_hash *rv, const void *buffer, int len); -int double_hash_generate(const struct double_hash *rv, int i, int m);
\ No newline at end of file +static inline unsigned int double_hash_generate(const struct double_hash *rv, unsigned int i, unsigned int m) +{ + return (rv->a + i * rv->b) % m; +} + +#define TIME_MEASURE 0 //to observe delay jitter +#if TIME_MEASURE +#define MAX_DELAY (100000) //ns +#define TIME_RECORD() struct timespec _start_time, _end_time; long long time_diff_ns; clock_gettime(CLOCK_REALTIME, &_start_time) +#define TIME_DIFF() \ + do { \ + clock_gettime(CLOCK_REALTIME, &_end_time); \ + if (likely(_end_time.tv_sec == _start_time.tv_sec))\ + {\ + time_diff_ns = (_end_time.tv_nsec - _start_time.tv_nsec);\ + }else{\ + time_diff_ns = ((long long)_end_time.tv_sec * 1000 * 1000 * 1000 + _end_time.tv_nsec) - ((long long)_start_time.tv_sec * 1000 * 1000 * 1000 + _start_time.tv_nsec);\ + }\ + if(unlikely(time_diff_ns > MAX_DELAY)){\ + fprintf(stderr, "%s:%d, timestamp:%lld.%lld, time diff:%ld ns\n", __FILE__, __LINE__, (long long)_start_time.tv_sec, (long long)_start_time.tv_nsec, time_diff_ns);\ + }\ + }while (0) +#else +#define TIME_RECORD() +#define TIME_DIFF() +#endif
\ No newline at end of file |
