summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CRDT/CMakeLists.txt2
-rw-r--r--CRDT/ap_bloom.c555
-rw-r--r--CRDT/ap_bloom.h8
-rw-r--r--CRDT/ap_bloom_gtest.cpp69
-rw-r--r--CRDT/crdt_utils.c11
-rw-r--r--CRDT/crdt_utils.h29
6 files changed, 466 insertions, 208 deletions
diff --git a/CRDT/CMakeLists.txt b/CRDT/CMakeLists.txt
index f636e8c..3b00795 100644
--- a/CRDT/CMakeLists.txt
+++ b/CRDT/CMakeLists.txt
@@ -28,4 +28,4 @@ add_executable(ap_bloom_gtest ap_bloom_gtest.cpp
${PROJECT_SOURCE_DIR}/CRDT/ap_bloom.c
${PROJECT_SOURCE_DIR}/deps/xxhash/xxhash.c
${PROJECT_SOURCE_DIR}/CRDT/crdt_utils.c)
-target_link_libraries(ap_bloom_gtest gtest-static pthread fieldstat4) \ No newline at end of file
+target_link_libraries(ap_bloom_gtest gtest-static pthread fieldstat4)
diff --git a/CRDT/ap_bloom.c b/CRDT/ap_bloom.c
index 51014e9..fe4eee4 100644
--- a/CRDT/ap_bloom.c
+++ b/CRDT/ap_bloom.c
@@ -2,7 +2,7 @@
#include "crdt_utils.h"
#include "utlist.h"
#include "utarray.h"
-
+#include "xxhash.h"
#include <math.h> // log, ceil
#include <stdio.h> // printf
#include <assert.h> // assert
@@ -11,62 +11,255 @@
#include <unistd.h> // write
#include <stdint.h> //uint64_t
#include <stdbool.h>
+
+#define APBM_USE_MEMPOOL 1 //use mempool to reduce page-faults
+#if APBM_USE_MEMPOOL
+#define APMOOL_ALLOC(mp, type, number) ((type *)apbm_mempool_alloc(mp, sizeof(type)*(number)))
+#define APMOOL_FREE(mp, p) {apbm_mempool_free(mp, p); p=NULL;}
+#else
+#define APMOOL_ALLOC(mp, type, number) ALLOC(type, number)
+#define APMOOL_FREE(mp, p) FREE(&p)
+#endif
+
+#define USE_HUGE_PAGE (0) //precondition : sysctl -w vm.nr_hugepages=128,256,512... or other number by your needs
+#define MEM_POOL_INIT_BLOCK_NUM (24)
+#define MEM_ALIGN_SIZE() (getpagesize() > 4096 ? 4096: getpagesize())
+
+#if APBM_USE_MEMPOOL
+struct apbm_block_info{
+ void *block_memory_head;
+ void *block_memory_tail;
+ void **block_heads;
+ int *block_inused_flags;
+ unsigned int block_size;
+ unsigned int block_num;
+ int ues_hugepage;
+ int to_flush_block_num;
+};
+struct apbm_mempool{
+ struct apbm_block_info large;
+ struct apbm_block_info small;
+};
+enum mempool_flag{
+ MEMPOOL_USABLE = 0,
+ MEMPOOL_INUSED = 1,
+};
+
+static int apbm_mempool_block_init(struct apbm_block_info *block, size_t block_size, int block_num)
+{
+ block->block_size = block_size;
+ block->block_num = block_num;
+#if USE_HUGE_PAGE
+ block->block_memory_head = mmap(NULL, block_num * block_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS | MAP_HUGETLB, -1, 0);
+ if(MAP_FAILED == block->block_memory_head){
+ perror("dabm_mempool_init() mmap huge-page error, use glibc: ");
+ posix_memalign((void **)&block->block_memory_head, 64, block->block_num * block->block_size);
+ block->ues_hugepage = 0;
+ }else{
+ block->ues_hugepage = 1;
+ }
+#else
+ posix_memalign((void **)&block->block_memory_head, MEM_ALIGN_SIZE(), block->block_num * block->block_size);
+#endif
+ block->block_memory_tail = (char *)block->block_memory_head + block->block_num * block->block_size;
+ memset(block->block_memory_head, 0, block->block_num * block->block_size);
+ block->block_heads = (void **)calloc(block->block_num, sizeof(void *));
+ block->block_inused_flags = (int *)calloc(block->block_num, sizeof(int));
+ for(int i=0; i<block->block_num; i++)
+ {
+ block->block_heads[i] = (char *)block->block_memory_head + i * block->block_size;
+ block->block_inused_flags[i] = 0;
+ }
+ return 0;
+}
+static void apbm_mp_block_free(struct apbm_block_info *block, void *p)
+{
+ for (int i = 0; i < block->block_num; i++)
+ {
+ if((block->block_heads[i] == p)
+ && (block->block_inused_flags[i] == MEMPOOL_INUSED)){
+ memset(p, 0, block->block_size); //memset in free()
+ block->block_inused_flags[i] = MEMPOOL_USABLE;
+ return;
+ }
+ }
+ assert(0);
+}
+static void apbm_mp_block_destroy(struct apbm_block_info *block)
+{
+ if(block->block_memory_head){
+ if(block->ues_hugepage){
+ munmap(block->block_memory_head, block->block_num * block->block_size);
+ }else{
+ free(block->block_memory_head);
+ }
+ block->block_memory_head = NULL;
+ }
+ if(block->block_heads){
+ free(block->block_heads);
+ block->block_heads = NULL;
+ }
+ if(block->block_inused_flags){
+ free(block->block_inused_flags);
+ block->block_inused_flags = NULL;
+ }
+ block->block_num = 0;
+ block->block_size = 0;
+}
+static void *apbm_mp_block_alloc(struct apbm_block_info *block, int size)
+{
+ for (int i = 0; i < block->block_num; i++){
+ if(block->block_inused_flags[i] == MEMPOOL_USABLE){
+ block->block_inused_flags[i] = MEMPOOL_INUSED;
+ return block->block_heads[i];
+ }
+ }
+ return NULL;
+}
+
+void *apbm_mempool_alloc(struct apbm_mempool *mp, int size)
+{
+ void *res = NULL;
+ if(size <= mp->small.block_size){
+ res = apbm_mp_block_alloc(&mp->small, size);
+ }else if(size <= mp->large.block_size){
+ res = apbm_mp_block_alloc(&mp->large, size);
+ }
+ if(NULL == res){
+ //mempool full or size unmatch, alloc use glibc
+ res = calloc(size, sizeof(char));
+ }
+ return res;
+}
+void apbm_mempool_free(struct apbm_mempool *mp, void *p)
+{
+ if((unsigned long long)p >= (unsigned long long)mp->small.block_memory_head
+ && (unsigned long long)p <= (unsigned long long)mp->small.block_memory_tail){
+ return apbm_mp_block_free(&mp->small, p);
+ }
+ if((unsigned long long)p >= (unsigned long long)mp->large.block_memory_head
+ && (unsigned long long)p <= (unsigned long long)mp->large.block_memory_tail){
+ return apbm_mp_block_free(&mp->large, p);
+ }
+ //not allocated from mempool
+ free(p);
+}
+struct apbm_mempool *apbm_mempool_new(size_t large_block_size, int large_block_num, size_t small_block_size, int small_block_num)
+{
+ struct apbm_mempool *mp = (struct apbm_mempool *)calloc(1, sizeof(struct apbm_mempool));
+ memset(mp, 0, sizeof(struct apbm_mempool));
+ apbm_mempool_block_init(&mp->large, large_block_size, large_block_num);
+ apbm_mempool_block_init(&mp->small, small_block_size, small_block_num);
+ return mp;
+}
+void apbm_mempool_destroy(struct apbm_mempool *mp)
+{
+ apbm_mp_block_destroy(&mp->large);
+ apbm_mp_block_destroy(&mp->small);
+ free(mp);
+}
+#endif
+
+
/* source
* https://github.com/RedisBloom/RedisBloom/blob/AgePartitionedBF/contrib/agingBloom.c
*/
#define SLICE_EXPANSION 2
-#define FILL_RATIO_THRESHOLD 0.1
+#define FILL_RATIO_THRESHOLD 0.5
#define MIN_CAPACITY 1000
-
//The test_bit_set_bit() code snap is from RedisBloom (BSD License).
//Source https://github.com/RedisBloom/RedisBloom/blob/master/deps/bloom/bloom.c
#define MODE_READ 0
#define MODE_WRITE 1
-//return 1 if the bit is already set, 0 if it was not set
-inline static int test_bit_set_bit(unsigned char *buf, uint64_t x, int mode) {
- uint64_t byte = x >> 3;
- uint8_t mask = 1 << (x % 8);
- uint8_t c = buf[byte]; // expensive memory access
+struct AP_bloom_stat{
+ size_t slice_new_num_acc;
+ size_t slice_free_num_acc;
+ size_t expand_num_acc;
+ size_t expand_max_multiple_rt;
+ size_t slice_num_rt;
+};
+struct AP_configuration
+{
+ double error;
+ long long capacity;
+ long long time_window_ms;
+ long long time_slice_num;
+ struct timeval last_cfg;
+};
+struct AP_bloom
+{
+ //high level variables determined by caller
+ struct AP_configuration cfg;
+ //low level variables caculated by the call configure
+ int hash_num; //k
+ int chain_num; //hash_num+time_slice_num
+ int default_slice_size; // in bytes
+
+ //runtime variables
+ int cursor;
+ struct timeval last_slide;
+ struct ap_slice **heads; //hash_num+timeframe_num
+
+ struct AP_bloom_stat stat;
+ struct apbm_mempool *mp;
+};
+
+//return 1 if the bit is already set, 0 if it was not set
+inline static int set_bit(unsigned char *buf, uint32_t bit_index, uint32_t byte_index) {
+ uint8_t mask = 1 << (bit_index % 8);
+ uint8_t c = buf[byte_index]; // expensive memory access, maybe trigger page-fault events
if (c & mask) {
return 1;
} else {
- if (mode == MODE_WRITE) {
- buf[byte] = c | mask;
- }
+ buf[byte_index] = c | mask;
return 0;
}
}
+inline static int test_bit(const unsigned char *buf, uint32_t bit_index, uint32_t byte_index) {
+ uint8_t mask = 1 << (bit_index % 8);
+ uint8_t c = buf[byte_index]; // expensive memory access, maybe trigger page-fault events
+ return c & mask;
+}
+
struct ap_slice
{
- int hash_index;
+ unsigned int hash_index;
int popcount; //the number of 1-bits in the slice
- int slice_size; //in bytes
+ unsigned int slice_size; //in bytes
struct timeval last_insert;
struct timeval first_insert;
unsigned char * data;
struct ap_slice * next;
};
-static struct ap_slice *ap_slice_new(int slice_size, int hash_index)
+static struct ap_slice *ap_slice_new(struct AP_bloom *bloom, int slice_size, int hash_index)
{
- struct ap_slice *slice = ALLOC(struct ap_slice, 1);
+ struct ap_slice *slice = APMOOL_ALLOC(bloom->mp, struct ap_slice, 1);
slice->slice_size = slice_size;
assert(slice_size % sizeof(unsigned long long) == 0);
//ap_slice_chain_deserialize may alloc slice_size=0
- if(slice_size > 0) slice->data = ALLOC(unsigned char, slice_size);
+ if(slice_size > 0){
+ slice->data = APMOOL_ALLOC(bloom->mp, unsigned char, slice_size);
+ memset(slice->data, 0, slice_size);
+ }
slice->hash_index = hash_index;
slice->popcount = 0;
+
+ bloom->stat.slice_new_num_acc++;
+ bloom->stat.slice_num_rt++;
return slice;
}
-static void ap_slice_free(struct ap_slice *slice)
+static void ap_slice_free(struct AP_bloom *bloom, struct ap_slice *slice)
{
- free(slice->data);
- slice->data = NULL;
- free(slice);
+ APMOOL_FREE(bloom->mp, slice->data);
+ // slice->data = NULL;
+ APMOOL_FREE(bloom->mp, slice);
+ bloom->stat.slice_free_num_acc++;
+ bloom->stat.slice_num_rt--;
}
void ap_slice_chain_info(const struct ap_slice *head, double *fill_ratio, int *slice_num)
{
@@ -105,134 +298,53 @@ static int ap_slice_cmp(const struct ap_slice *a, const struct ap_slice *b)
//bigger size first
return b->slice_size - a->slice_size;
}
-static struct ap_slice * ap_slice_duplicate(const struct ap_slice *slice)
+static struct ap_slice * ap_slice_duplicate(struct AP_bloom *bloom, const struct ap_slice *slice)
{
- struct ap_slice *new_slice = ap_slice_new(slice->slice_size, slice->hash_index);
+ struct ap_slice *new_slice = ap_slice_new(bloom, slice->slice_size, slice->hash_index);
memcpy(new_slice, slice, offsetof(struct ap_slice, data));
memcpy(new_slice->data, slice->data, slice->slice_size);
return new_slice;
}
#define HASH_TO_OFFSET(hash, slice) ((hash.a + slice->hash_index * hash.b) % (slice->slice_size<<3))
-void ap_slice_add_hash(struct ap_slice *slice, struct double_hash hash, struct timeval now)
+void ap_slice_add_hash(struct ap_slice *slice, const struct double_hash *hash, struct timeval now)
{
- int index=double_hash_generate(&hash, slice->hash_index, slice->slice_size<<3);
- int added = !test_bit_set_bit(slice->data, index, MODE_WRITE);
+ unsigned int bit_index=double_hash_generate(hash, slice->hash_index, slice->slice_size<<3);
+ unsigned int byte_index = bit_index>>3;
+ __builtin_prefetch(&slice->data[byte_index], 1);
+ int added = !set_bit(slice->data, bit_index, byte_index);
+ slice->popcount += added;
slice->last_insert = now;
- if(slice->popcount == 0)
+ if(unlikely(slice->popcount == 0))
{
slice->first_insert = now;
}
- slice->popcount += added;
return;
}
-int ap_slice_check_hash(const struct ap_slice *slice, struct double_hash hash)
-{
- int index=double_hash_generate(&hash, slice->hash_index, slice->slice_size<<3);
- return test_bit_set_bit(slice->data, index, MODE_READ);
-}
-struct ap_slice_event
-{
- struct timeval timestamp;
- int slice_size;
- int hash_index;
- int event;//1: start, -1: end
-};
-int slice_event_cmp(const void *a, const void *b)
-{
- const struct ap_slice_event *ea = (struct ap_slice_event*)a;
- const struct ap_slice_event *eb = (struct ap_slice_event*)b;
- if(timercmp(&ea->timestamp, &eb->timestamp, <))
- {
- return -1;
- }
- if(timercmp(&ea->timestamp, &eb->timestamp, >))
- {
- return 1;
- }
- return eb->event - ea->event;
-}
-UT_icd slice_event_icd = {sizeof(struct ap_slice_event), NULL, NULL, NULL};
-struct ap_state
-{
- int consecutive_matches;
- int visited_slices;
- int hash_num;
- UT_array slice_time_events;
-};
-void ap_state_init(struct ap_state *state, int hash_num)
-{
- state->consecutive_matches=0;
- state->visited_slices=0;
- state->hash_num=hash_num;
- utarray_init(&state->slice_time_events, &slice_event_icd);
- utarray_reserve(&state->slice_time_events, hash_num*2);
-}
-void ap_state_clear(struct ap_state *state)
+int ap_slice_check_hash(const struct ap_slice *slice, const struct double_hash *hash)
{
- state->consecutive_matches=0;
- state->visited_slices=0;
- utarray_clear(&state->slice_time_events);
-}
-void ap_state_done(struct ap_state *state)
-{
- utarray_done(&state->slice_time_events);
-}
-int ap_state_is_match(struct ap_state *state)
-{
- int counter=0;
- struct ap_slice_event *ev=NULL;
- if(state->consecutive_matches >= state->hash_num)
- {
- if(state->visited_slices==state->consecutive_matches)//fastpath for no slice expansion
- {
- return 1;
- }
- utarray_sort(&state->slice_time_events, slice_event_cmp);
- while((ev=(struct ap_slice_event*)utarray_next(&state->slice_time_events, ev)))
- {
- counter += ev->event;
- if(counter == state->hash_num)
- {
- return 1;
- }
- }
- }
- return 0;
+ unsigned int bit_index=double_hash_generate(hash, slice->hash_index, slice->slice_size<<3);
+ unsigned int byte_index = bit_index>>3;
+ __builtin_prefetch(&slice->data[byte_index], 0);
+ return test_bit(slice->data, bit_index, byte_index);
}
-static void ap_slice_chain_check_hash(const struct ap_slice *head, struct double_hash hash, struct ap_state *state)
+
+static int ap_slice_chain_check_hash(const struct ap_slice *head, const struct double_hash *hash)
{
+ TIME_RECORD();
//In a stackable (scalable) Bloom filter, checking for membership now involves inspecting each layer for presence.
const struct ap_slice *slice=NULL;
- struct ap_slice_event ev;
int found=0;
LL_FOREACH(head, slice)
{
if(ap_slice_check_hash(slice, hash))
{
- ev.timestamp = slice->first_insert;
- ev.event = 1;
- ev.slice_size=slice->slice_size;
- ev.hash_index=slice->hash_index;
- utarray_push_back(&state->slice_time_events, &ev);
- ev.timestamp = slice->last_insert;
- ev.event = -1;
- ev.slice_size = slice->slice_size;
- ev.hash_index = slice->hash_index;
- utarray_push_back(&state->slice_time_events, &ev);
found=1;
+ break; //quick path
}
- state->visited_slices++;
- }
- if(found)
- {
- state->consecutive_matches++;
- }
- else
- {
- ap_state_clear(state);
}
- return;
+ TIME_DIFF();
+ return found;
}
/* Return:
@@ -240,13 +352,17 @@ static void ap_slice_chain_check_hash(const struct ap_slice *head, struct double
* 0 - element was not present and was added
* 1 - element (or a collision) had already been added previously
*/
-static void ap_slice_chain_add_hash(struct ap_slice **head, struct double_hash hash, struct timeval now)
+static void ap_slice_chain_add_hash(struct AP_bloom *bloom, struct ap_slice **head, const struct double_hash *hash, struct timeval now)
{
//add new slice if the current slice is full
- if( (double) (*head)->popcount / ((*head)->slice_size<<3) > FILL_RATIO_THRESHOLD)
+ if(unlikely( (double) (*head)->popcount / ((*head)->slice_size<<3) > FILL_RATIO_THRESHOLD))
{
- struct ap_slice *new_slice = ap_slice_new((*head)->slice_size * SLICE_EXPANSION, (*head)->hash_index);
+ struct ap_slice *new_slice = ap_slice_new(bloom, (*head)->slice_size * SLICE_EXPANSION, (*head)->hash_index);
LL_PREPEND((*head), new_slice);
+ bloom->stat.expand_num_acc++;
+ if(new_slice->slice_size/bloom->default_slice_size > bloom->stat.expand_max_multiple_rt){
+ bloom->stat.expand_max_multiple_rt = new_slice->slice_size/bloom->default_slice_size;
+ }
}
//Add it to the current (head) slice.
ap_slice_add_hash(*head, hash, now);
@@ -265,22 +381,22 @@ void ap_slice_chain_first_insert_time(const struct ap_slice *head, struct timeva
}
return;
}
-static void ap_slice_chain_free(struct ap_slice *head)
+static void ap_slice_chain_free(struct AP_bloom *bloom, struct ap_slice *head)
{
struct ap_slice *slice=NULL, *tmp=NULL;
LL_FOREACH_SAFE(head, slice, tmp)
{
LL_DELETE(head, slice);
- ap_slice_free(slice);
+ ap_slice_free(bloom, slice);
}
}
-static struct ap_slice *ap_slice_chain_duplicate(const struct ap_slice *src)
+static struct ap_slice *ap_slice_chain_duplicate(struct AP_bloom *bloom, const struct ap_slice *src)
{
struct ap_slice *new_head=NULL;
const struct ap_slice *slice=NULL;
LL_FOREACH(src, slice)
{
- struct ap_slice * new_slice=ap_slice_duplicate(slice);
+ struct ap_slice * new_slice=ap_slice_duplicate(bloom, slice);
LL_APPEND(new_head, new_slice);
}
return new_head;
@@ -296,6 +412,16 @@ static size_t ap_slice_chain_mem_size(const struct ap_slice *head)
}
return sz;
}
+static size_t ap_slice_chain_mem_blocks(const struct ap_slice *head)
+{
+ size_t blocks=0;
+ const struct ap_slice *slice=NULL;
+ LL_FOREACH(head, slice)
+ {
+ blocks += 2;
+ }
+ return blocks;
+}
struct slice_chain_header
{
int chain_sequence;
@@ -341,14 +467,14 @@ static size_t ap_slice_chain_serialize(const struct ap_slice *head, int chain_se
assert(offset<=buffer_sz);
return offset;
}
-static struct ap_slice *ap_slice_chain_deserialize(const char *buffer, size_t buffer_sz)
+static struct ap_slice *ap_slice_chain_deserialize(struct AP_bloom *bloom, const char *buffer, size_t buffer_sz)
{
struct slice_chain_header *header=(struct slice_chain_header*)buffer;
size_t offset=sizeof(struct slice_chain_header);
struct ap_slice *head=NULL;
for(size_t i=0; i<header->slice_number; i++)
{
- struct ap_slice *slice = ap_slice_new(0, 0);
+ struct ap_slice *slice = ap_slice_new(bloom, 0, 0);
memcpy(slice, buffer+offset, offsetof(struct ap_slice, data));
offset += offsetof(struct ap_slice, data);
slice->data = (unsigned char*) malloc(slice->slice_size);
@@ -362,7 +488,7 @@ static struct ap_slice *ap_slice_chain_deserialize(const char *buffer, size_t bu
}
return head;
}
-static void ap_slice_merge(struct ap_slice *dst, const struct ap_slice *src)
+static void ap_slice_merge(struct AP_bloom *bloom, struct ap_slice *dst, const struct ap_slice *src)
{
assert(dst->hash_index == src->hash_index);
assert(dst->slice_size == src->slice_size);
@@ -393,7 +519,7 @@ static void ap_slice_merge(struct ap_slice *dst, const struct ap_slice *src)
}
return;
}
-static void ap_slice_chain_merge(struct ap_slice **dst_head, const struct ap_slice *src_head)
+static void ap_slice_chain_merge(struct AP_bloom *bloom, struct ap_slice **dst_head, const struct ap_slice *src_head)
{
const struct ap_slice *src_slice=NULL;
int merged=0;
@@ -409,43 +535,20 @@ static void ap_slice_chain_merge(struct ap_slice **dst_head, const struct ap_sli
{
if(src_slice->slice_size == dst_slice->slice_size)
{
- ap_slice_merge(dst_slice, src_slice);
+ ap_slice_merge(bloom, dst_slice, src_slice);
merged=1;
break;
}
}
if(!merged)
{
- struct ap_slice *new_slice = ap_slice_duplicate(src_slice);
+ struct ap_slice *new_slice = ap_slice_duplicate(bloom, src_slice);
LL_APPEND(*dst_head, new_slice);
}
}
LL_SORT(*dst_head, ap_slice_cmp);
ap_slice_sanity(*dst_head);
}
-struct AP_configuration
-{
- double error;
- long long capacity;
- long long time_window_ms;
- long long time_slice_num;
- struct timeval last_cfg;
-};
-struct AP_bloom
-{
- //high level variables determined by caller
- struct AP_configuration cfg;
-
- //low level variables caculated by the call configure
- int hash_num; //k
- int chain_num; //hash_num+time_slice_num
- int default_slice_size; // in bytes
-
- //runtime variables
- int cursor;
- struct timeval last_slide;
- struct ap_slice **heads; //hash_num+timeframe_num
-};
#define DBL_EPSILON 2.2204460492503131e-16
bool definitelyLessThan(float a, float b)
{
@@ -470,18 +573,26 @@ struct AP_bloom *AP_bloom_new(struct timeval now, double error_rate, long long c
bloom->cfg.last_cfg = now;
bloom->cfg.time_slice_num = time_slice_num;
- bloom->last_slide = now;
bloom->cursor = 0;
bloom->default_slice_size = ceil(capacity/log(2));
bloom->default_slice_size = ceil((double)bloom->default_slice_size/64)*64/8;
+ if(bloom->default_slice_size % MEM_ALIGN_SIZE() != 0){
+ bloom->default_slice_size = (bloom->default_slice_size / MEM_ALIGN_SIZE() + 1) * MEM_ALIGN_SIZE();
+ }
+ assert(0 == bloom->default_slice_size % MEM_ALIGN_SIZE());
+#if APBM_USE_MEMPOOL
+ bloom->mp = apbm_mempool_new(bloom->default_slice_size, MEM_POOL_INIT_BLOCK_NUM, 128, MEM_POOL_INIT_BLOCK_NUM);
+#endif
bloom->chain_num = bloom->hash_num + bloom->cfg.time_slice_num;
- bloom->heads = ALLOC(struct ap_slice*, bloom->chain_num);
+ bloom->heads = APMOOL_ALLOC(bloom->mp, struct ap_slice*, bloom->chain_num);
for(int i=0; i<bloom->chain_num; i++)
{
- bloom->heads[i] = ap_slice_new(bloom->default_slice_size, i % bloom->chain_num);
+ bloom->heads[i] = ap_slice_new(bloom, bloom->default_slice_size, i % bloom->chain_num);
}
-
+ bloom->stat.expand_max_multiple_rt = 1;
+ bloom->stat.slice_num_rt = bloom->chain_num;
+ bloom->last_slide = now;
return bloom;
}
@@ -490,23 +601,25 @@ static void slide_time(struct AP_bloom *bloom, struct timeval now)
int chain_num = bloom->chain_num;
long long slide_time_frame_us=bloom->cfg.time_slice_num?(bloom->cfg.time_window_ms*1000/bloom->cfg.time_slice_num):INT64_MAX;
long long slide_us=timeval_delta_us(bloom->last_slide, now);
- long long elapse_us=timeval_delta_us(bloom->cfg.last_cfg, now);
- long long epoches=elapse_us/slide_time_frame_us;
- if(slide_us < slide_time_frame_us)
+ if(likely(slide_us < slide_time_frame_us))
{
return;
}
+ TIME_RECORD();
+ long long elapse_us=timeval_delta_us(bloom->cfg.last_cfg, now);
+ long long epoches=elapse_us/slide_time_frame_us;
slide_us -= slide_us % slide_time_frame_us;
int n_slide = slide_us/slide_time_frame_us;
-
+ int reset_hash_index;
if(n_slide < bloom->cfg.time_slice_num)
{
for(int i=0; i<n_slide; i++)
{
int reset_idx = (bloom->cursor + bloom->hash_num) % chain_num;
- struct ap_slice *slice = ap_slice_new(bloom->default_slice_size, bloom->heads[reset_idx]->hash_index);
- ap_slice_chain_free(bloom->heads[reset_idx]);
+ reset_hash_index = bloom->heads[reset_idx]->hash_index;
+ ap_slice_chain_free(bloom, bloom->heads[reset_idx]);
+ struct ap_slice *slice = ap_slice_new(bloom, bloom->default_slice_size, reset_hash_index);
bloom->heads[reset_idx] = slice;
bloom->cursor = (bloom->cursor + 1) % chain_num;
}
@@ -515,76 +628,90 @@ static void slide_time(struct AP_bloom *bloom, struct timeval now)
{
for(int i=0; i<chain_num; i++)
{
- struct ap_slice *new_slice=ap_slice_new(bloom->default_slice_size, bloom->heads[i]->hash_index);
- ap_slice_chain_free(bloom->heads[i]);
+ reset_hash_index = bloom->heads[i]->hash_index;
+ ap_slice_chain_free(bloom, bloom->heads[i]);
+ struct ap_slice *new_slice=ap_slice_new(bloom, bloom->default_slice_size, reset_hash_index);
bloom->heads[i] = new_slice;
}
bloom->cursor = (bloom->cursor + n_slide) % chain_num;
}
+
struct timeval slide_time;
slide_time.tv_sec = slide_us/1000/1000;
slide_time.tv_usec = slide_us%(1000*1000);
timeradd(&bloom->last_slide, &slide_time, &bloom->last_slide);
assert(bloom->cursor == (epoches % chain_num) || bloom->cursor == (epoches % chain_num + 1) % chain_num);
+ TIME_DIFF();
return;
}
void AP_bloom_add(struct AP_bloom *bloom, struct timeval now, const char *buffer, int len)
{
- struct double_hash hash;
- double_hash_init(&hash, buffer, len);
- if(bloom->cfg.time_window_ms)
+ TIME_RECORD();
+ struct double_hash hash;
+ double_hash_init(&hash, buffer, (size_t)len);
+ if(bloom->cfg.time_slice_num && bloom->cfg.time_window_ms)
{
slide_time(bloom, now);
}
for(int i=0; i<bloom->hash_num; i++)
{
int idx=(bloom->cursor+i) % (bloom->chain_num);
- ap_slice_chain_add_hash(&(bloom->heads[idx]), hash, now);
+ ap_slice_chain_add_hash(bloom,&(bloom->heads[idx]), &hash, now);
}
+ TIME_DIFF();
return;
}
int AP_bloom_check(const struct AP_bloom *bloom, struct timeval now, const char *buffer, int len)
{
- if(timeval_delta_ms(bloom->cfg.last_cfg, now) < 0)
+ if(unlikely(timeval_delta_ms(bloom->cfg.last_cfg, now) < 0))
{
return 0;
}
+ TIME_RECORD();
struct double_hash hash;
- double_hash_init(&hash, buffer, len);
+ double_hash_init(&hash, buffer, (size_t)len);
int chain_num = bloom->chain_num;
-
- struct ap_state state;
- ap_state_init(&state, bloom->hash_num);
- for(int i = 0; i < bloom->hash_num+chain_num; i++)
+ int match_num = 0, unmatch_num = 0;
+ for(int i = 0; i < chain_num; i++)
{
- long long delta_us = timeval_delta_us(bloom->heads[i%chain_num]->last_insert, now);
- if(bloom->cfg.time_window_ms && delta_us > bloom->cfg.time_window_ms*1000)
+ long long delta_us = timeval_delta_us(bloom->heads[i]->last_insert, now);
+ if(unlikely((delta_us > bloom->cfg.time_window_ms*1000)
+ && bloom->cfg.time_window_ms))
{
- ap_state_clear(&state);
continue;
}
- ap_slice_chain_check_hash(bloom->heads[i%chain_num], hash, &state);
- if(ap_state_is_match(&state))
- {
- ap_state_done(&state);
- return 1;
+ if(ap_slice_chain_check_hash(bloom->heads[i], &hash)){
+ match_num++;
+ }else{
+ unmatch_num++;
+ }
+ if(unmatch_num > (chain_num - bloom->hash_num)){
+ return 0; //quick path for check false
}
}
- ap_state_done(&state);
+ if(match_num >= bloom->hash_num)
+ {
+ TIME_DIFF();
+ return 1;
+ }
+ TIME_DIFF();
return 0;
}
void AP_bloom_free(struct AP_bloom *bloom)
{
for(int i=0; i<bloom->chain_num; i++)
{
- ap_slice_chain_free(bloom->heads[i]);
+ ap_slice_chain_free(bloom, bloom->heads[i]);
bloom->heads[i]=NULL;
}
- free(bloom->heads);
- bloom->heads=NULL;
+ APMOOL_FREE(bloom->mp, bloom->heads);
+
+#if APBM_USE_MEMPOOL
+ apbm_mempool_destroy(bloom->mp);
+#endif
free(bloom);
}
size_t AP_bloom_mem_size(const struct AP_bloom *bloom)
@@ -598,6 +725,17 @@ size_t AP_bloom_mem_size(const struct AP_bloom *bloom)
}
return sz;
}
+size_t AP_bloom_mem_blocks(const struct AP_bloom *bloom)
+{
+ size_t blocks=0;
+ blocks++;
+ blocks += bloom->chain_num;
+ for(int i=0; i<bloom->chain_num; i++)
+ {
+ blocks += ap_slice_chain_mem_blocks(bloom->heads[i]);
+ }
+ return blocks;
+}
size_t AP_bloom_serialize_size(const struct AP_bloom *bloom)
{
size_t sz=0;
@@ -625,14 +763,14 @@ void AP_bloom_serialize(const struct AP_bloom *bloom, char **blob, size_t *blob_
}
struct AP_bloom * AP_bloom_deserialize(const char *blob, size_t blob_sz)
{
- struct AP_bloom *bloom=ALLOC(struct AP_bloom, 1);
+ struct AP_bloom *bloom=ALLOC(struct AP_bloom, 1);
size_t offset=0;
memcpy(bloom, blob+offset, offsetof(struct AP_bloom, heads));
offset += offsetof(struct AP_bloom, heads);
bloom->heads=ALLOC(struct ap_slice*, bloom->chain_num);
for(int i=0; i<bloom->chain_num; i++)
{
- bloom->heads[i]=ap_slice_chain_deserialize(blob+offset, blob_sz-offset);
+ bloom->heads[i]=ap_slice_chain_deserialize(bloom, blob+offset, blob_sz-offset);
offset += ap_slice_chain_serialize_size(bloom->heads[i]);
}
assert(offset==blob_sz);
@@ -651,7 +789,7 @@ void AP_bloom_merge(struct AP_bloom *dst, const struct AP_bloom *src)
{
for(int i=0; i<dst->chain_num; i++)
{
- ap_slice_chain_free(dst->heads[i]);
+ ap_slice_chain_free(dst, dst->heads[i]);
}
memcpy(dst, src, offsetof(struct AP_bloom, heads));
@@ -659,7 +797,7 @@ void AP_bloom_merge(struct AP_bloom *dst, const struct AP_bloom *src)
dst->heads = ALLOC(struct ap_slice*, dst->chain_num);
for(int i=0; i<dst->chain_num; i++)
{
- dst->heads[i] = ap_slice_chain_duplicate(src->heads[i]);
+ dst->heads[i] = ap_slice_chain_duplicate(dst, src->heads[i]);
}
}
return;
@@ -680,7 +818,7 @@ void AP_bloom_merge(struct AP_bloom *dst, const struct AP_bloom *src)
{
continue;
}
- ap_slice_chain_merge(&(dst->heads[i]), src->heads[i]);
+ ap_slice_chain_merge(dst,&(dst->heads[i]), src->heads[i]);
}
return;
}
@@ -766,4 +904,29 @@ void AP_bloom_info(const struct AP_bloom *bloom, struct AP_bloom_info *info)
info->approximate_item_num = approximate_item_num(bloom);
info->oldest_item_time = oldest_item;
return;
+}
+
+size_t AP_bloom_stat_get_hash_num(const struct AP_bloom *bloom)
+{
+ return bloom->hash_num;
+}
+size_t AP_bloom_stat_get_new_num(const struct AP_bloom *bloom)
+{
+ return bloom->stat.slice_new_num_acc;
+}
+size_t AP_bloom_stat_get_free_num(const struct AP_bloom *bloom)
+{
+ return bloom->stat.slice_free_num_acc;
+}
+size_t AP_bloom_stat_get_expand_num(const struct AP_bloom *bloom)
+{
+ return bloom->stat.expand_num_acc;
+}
+size_t AP_bloom_stat_get_cur_slice_num(const struct AP_bloom *bloom)
+{
+ return bloom->stat.slice_num_rt;
+}
+size_t AP_bloom_stat_get_expand_max_multiple(const struct AP_bloom *bloom)
+{
+ return bloom->stat.expand_max_multiple_rt;
} \ No newline at end of file
diff --git a/CRDT/ap_bloom.h b/CRDT/ap_bloom.h
index c40b5b9..73c6502 100644
--- a/CRDT/ap_bloom.h
+++ b/CRDT/ap_bloom.h
@@ -73,6 +73,14 @@ struct AP_bloom *AP_bloom_replicate(uuid_t uuid, const char *blob, size_t blob_s
size_t AP_bloom_serialized_size(const struct AP_bloom *bloom);
size_t AP_bloom_mem_size(const struct AP_bloom *bloom);
+size_t AP_bloom_stat_get_hash_num(const struct AP_bloom *bloom);
+size_t AP_bloom_mem_blocks(const struct AP_bloom *bloom);
+size_t AP_bloom_stat_get_new_num(const struct AP_bloom *bloom);
+size_t AP_bloom_stat_get_free_num(const struct AP_bloom *bloom);
+size_t AP_bloom_stat_get_expand_num(const struct AP_bloom *bloom);
+size_t AP_bloom_stat_get_cur_slice_num(const struct AP_bloom *bloom);
+size_t AP_bloom_stat_get_expand_max_multiple(const struct AP_bloom *bloom);
+
#ifdef __cplusplus
}
#endif \ No newline at end of file
diff --git a/CRDT/ap_bloom_gtest.cpp b/CRDT/ap_bloom_gtest.cpp
index b0f10d9..0f105c4 100644
--- a/CRDT/ap_bloom_gtest.cpp
+++ b/CRDT/ap_bloom_gtest.cpp
@@ -18,7 +18,7 @@
static unsigned int BM_CAPACITY = 10000000;
static double BM_ERROR_RATE = 0.000001d;
static int BM_TIMEOUT = 10 * 1000; // ms
-static int BM_SLICE_NUM = 0;
+static int BM_SLICE_NUM = 3;
static int MAX_ITEM_NUM = 10000000;
static int ITEM_BATCH_NUM = 1000;
#define TUPLE4_ADDR_LEN (12 + 1) // sizeof(tuple4) add begin char 'Y' or 'N'
@@ -89,7 +89,9 @@ static void bm_add_item(struct AP_bloom *bloom_filter, char *tuple4_buf, unsigne
}
total_add_time += (unsigned long long)time_diff;
- fieldstat_easy_histogram_record(fs4_instance, 0, fs4_add_metric_id, &FS4_HISGRAM_TAG, 1, time_diff);
+ if(fs4_instance){
+ fieldstat_easy_histogram_record(fs4_instance, 0, fs4_add_metric_id, &FS4_HISGRAM_TAG, 1, time_diff);
+ }
}
static int bm_search_item(struct AP_bloom *bloom_filter, char *tuple4_buf, unsigned int index, int fs4_metric_id)
@@ -111,7 +113,9 @@ static int bm_search_item(struct AP_bloom *bloom_filter, char *tuple4_buf, unsig
}
total_search_time += (unsigned long long)time_diff;
- fieldstat_easy_histogram_record(fs4_instance, 0, fs4_metric_id, &FS4_HISGRAM_TAG, 1, time_diff);
+ if(fs4_instance){
+ fieldstat_easy_histogram_record(fs4_instance, 0, fs4_metric_id, &FS4_HISGRAM_TAG, 1, time_diff);
+ }
return ret;
}
@@ -310,6 +314,65 @@ TEST(apbloom, perf)
EXPECT_EQ(0, ap_bloom_filter_test(g_user_define_args));
}
+static int apbloom_merge_test_run(void)
+{
+ int ret = 0;
+ const int max_item_num = 1000000;
+ struct timeval current_time_tv;
+ gettimeofday(&current_time_tv, NULL);
+
+ struct AP_bloom *apbma = (struct AP_bloom *)AP_bloom_new(g_current_time_tv, BM_ERROR_RATE, BM_CAPACITY, 1000 * 999, BM_SLICE_NUM);
+ assert(apbma);
+ char tuple4_buf[1024] = {};
+
+ tuple4_buf[0] = 'A';
+ for(int i = 0; i < max_item_num; i++){
+ bm_add_item(apbma, tuple4_buf, i);
+ }
+
+ struct AP_bloom *apbmb = (struct AP_bloom *)AP_bloom_new(g_current_time_tv, BM_ERROR_RATE, BM_CAPACITY, 1000 * 999, BM_SLICE_NUM);
+ assert(apbmb);
+
+ tuple4_buf[0] = 'B';
+ for(int i = 0; i < max_item_num; i++){
+ bm_add_item(apbmb, tuple4_buf, i);
+ }
+
+ tuple4_buf[0] = 'B';
+ for(int i = 0; i < max_item_num; i++){
+ ret = bm_search_item(apbma, tuple4_buf, i, -1);
+ if(ret > 0){
+ fprintf(stderr, "found tuple4 with B apbma before merge!\n");
+ return -1;
+ }
+ }
+
+ AP_bloom_merge(apbma, apbmb);
+
+ for(int i = 0; i < max_item_num; i++){
+ tuple4_buf[0] = 'A';
+ ret = bm_search_item(apbma, tuple4_buf, i, -1);
+ if(ret <= 0){
+ fprintf(stderr, "not found tuple4 with A after merge, index:%d\n", i);
+ return -1;
+ }
+ tuple4_buf[0] = 'B';
+ ret = bm_search_item(apbma, tuple4_buf, i, -1);
+ if(ret <= 0){
+ fprintf(stderr, "not found tuple4 with B after merge, index:%d\n", i);
+ return -1;
+ }
+ }
+ AP_bloom_free(apbma);
+ AP_bloom_free(apbmb);
+ return 0;
+}
+
+TEST(apbloom, merge)
+{
+ EXPECT_EQ(0, apbloom_merge_test_run());
+}
+
static const char *gtest_cla_short_options = "hLf:u:";
static const struct option gtest_cla_long_options[] =
diff --git a/CRDT/crdt_utils.c b/CRDT/crdt_utils.c
index c796081..88f823d 100644
--- a/CRDT/crdt_utils.c
+++ b/CRDT/crdt_utils.c
@@ -3,11 +3,10 @@
void double_hash_init(struct double_hash *rv, const void *buffer, int len)
{
- rv->a=XXH3_64bits_withSeed(buffer, len, 0x9747b28c);
- rv->b=XXH3_64bits_withSeed(buffer, len, rv->a);
+ // rv->a=XXH3_64bits_withSeed(buffer, len, 0x9747b28c);
+ // rv->b=XXH3_64bits_withSeed(buffer, len, rv->a);
+ XXH128_hash_t xxh128b = XXH3_128bits_withSeed(buffer, len, 0x9747b28c);
+ rv->a = xxh128b.high64;
+ rv->b = xxh128b.low64;
return;
-}
-int double_hash_generate(const struct double_hash *rv, int i, int m)
-{
- return (rv->a + i * rv->b) % m;
} \ No newline at end of file
diff --git a/CRDT/crdt_utils.h b/CRDT/crdt_utils.h
index 951e44b..57c34b5 100644
--- a/CRDT/crdt_utils.h
+++ b/CRDT/crdt_utils.h
@@ -37,7 +37,7 @@
// Double hashing for fast hashing,
// reference: Kirsch, Adam, and Michael Mitzenmacher. "Less hashing, same performance: Building a better bloom filter."
-// Algorithms–ESA 2006: 14th Annual European Symposium, Zurich, Switzerland, September 11-13, 2006. Proceedings 14. Springer Berlin Heidelberg, 2006.
+// AlgorithmsCESA 2006: 14th Annual European Symposium, Zurich, Switzerland, September 11-13, 2006. Proceedings 14. Springer Berlin Heidelberg, 2006.
// https://www.eecs.harvard.edu/~michaelm/postscripts/rsa2008.pdf
struct double_hash
{
@@ -45,4 +45,29 @@ struct double_hash
uint64_t b;
};
void double_hash_init(struct double_hash *rv, const void *buffer, int len);
-int double_hash_generate(const struct double_hash *rv, int i, int m); \ No newline at end of file
+static inline unsigned int double_hash_generate(const struct double_hash *rv, unsigned int i, unsigned int m)
+{
+ return (rv->a + i * rv->b) % m;
+}
+
+#define TIME_MEASURE 0 //to observe delay jitter
+#if TIME_MEASURE
+#define MAX_DELAY (100000) //ns
+#define TIME_RECORD() struct timespec _start_time, _end_time; long long time_diff_ns; clock_gettime(CLOCK_REALTIME, &_start_time)
+#define TIME_DIFF() \
+ do { \
+ clock_gettime(CLOCK_REALTIME, &_end_time); \
+ if (likely(_end_time.tv_sec == _start_time.tv_sec))\
+ {\
+ time_diff_ns = (_end_time.tv_nsec - _start_time.tv_nsec);\
+ }else{\
+ time_diff_ns = ((long long)_end_time.tv_sec * 1000 * 1000 * 1000 + _end_time.tv_nsec) - ((long long)_start_time.tv_sec * 1000 * 1000 * 1000 + _start_time.tv_nsec);\
+ }\
+ if(unlikely(time_diff_ns > MAX_DELAY)){\
+ fprintf(stderr, "%s:%d, timestamp:%lld.%lld, time diff:%ld ns\n", __FILE__, __LINE__, (long long)_start_time.tv_sec, (long long)_start_time.tv_nsec, time_diff_ns);\
+ }\
+ }while (0)
+#else
+#define TIME_RECORD()
+#define TIME_DIFF()
+#endif \ No newline at end of file