diff options
| author | lijia <[email protected]> | 2024-06-07 02:07:14 +0800 |
|---|---|---|
| committer | yangwei <[email protected]> | 2024-06-07 02:07:14 +0800 |
| commit | 4005f8800a67560ec386c1388bfb1c2433c0dabd (patch) | |
| tree | 621a28354a2381653534f5cbc862285c8bffeeac | |
| parent | f82a8875043eaa55dd11cc41308d99b024bde706 (diff) | |
perf: add mempool for apbloom, simplify the AP_bloom_check() process.
| -rw-r--r-- | module_test/src/gtest_main.cpp | 5 | ||||
| -rw-r--r-- | module_test/src/gtest_sapp_bloom.cpp | 171 | ||||
| -rw-r--r-- | module_test/src/gtest_sapp_fun.h | 1 | ||||
| -rw-r--r-- | src/support/ap_bloom/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/support/ap_bloom/deps/ap_mempool.cpp | 157 | ||||
| -rw-r--r-- | src/support/ap_bloom/deps/ap_mempool.h | 28 | ||||
| -rw-r--r-- | src/support/ap_bloom/deps/crdt_utils.c | 12 | ||||
| -rw-r--r-- | src/support/ap_bloom/deps/crdt_utils.h | 54 | ||||
| -rw-r--r-- | src/support/ap_bloom/src/ap_bloom.c | 226 |
9 files changed, 439 insertions, 217 deletions
diff --git a/module_test/src/gtest_main.cpp b/module_test/src/gtest_main.cpp index bea1365..01d6b61 100644 --- a/module_test/src/gtest_main.cpp +++ b/module_test/src/gtest_main.cpp @@ -2224,6 +2224,11 @@ TEST(apbloom, simple) ASSERT_EQ(0, sapp_bloom_filter_test_run(BLOOM_LIBRARY_APBLOOM, user_define_args)); } +TEST(apbloom, merge) +{ + ASSERT_EQ(0, sapp_apbloom_merge_test_run()); +} + static const char *gtest_cla_short_options = "hvLsf:l:u:"; static const struct option gtest_cla_long_options[] = diff --git a/module_test/src/gtest_sapp_bloom.cpp b/module_test/src/gtest_sapp_bloom.cpp index 4416eb9..bfb10a9 100644 --- a/module_test/src/gtest_sapp_bloom.cpp +++ b/module_test/src/gtest_sapp_bloom.cpp @@ -1,5 +1,6 @@ #include <stdlib.h>
#include <stdio.h>
+#include <sys/time.h>
#include <unistd.h>
#include <netinet/ip.h>
#include <netinet/tcp.h>
@@ -54,10 +55,8 @@ static int APBM_SLICE_NUM = 0; #define BM_TEST_MAX_THREAD (1)
static pthread_t bm_test_thread_id[BM_TEST_MAX_THREAD];
static pthread_t timer_thread_id;
-static volatile struct timespec g_current_time_sp = {};
static volatile long long g_current_time_in_ms = 0;
static struct timeval g_current_time_tv;
-static volatile int timer_thread_run = 1;
static const unsigned int INIT_SIP = 0x12345678;
static const unsigned int INIT_DIP = 0x87654321;
@@ -82,37 +81,41 @@ static inline void bm_update_key(char *tuple4_buf, unsigned int index) *p_dport = INIT_DPORT + index;
}
-static unsigned long long max_add_time = 0; // us
-static unsigned long long total_add_time = 0; // us
-static unsigned long long max_search_time = 0; // us
-static unsigned long long total_search_time = 0; // us
+static unsigned long long max_add_time = 0; // ns
+static unsigned long long total_add_time = 0; // ns
+static unsigned long long max_search_time = 0; // ns
+static unsigned long long total_search_time = 0; // ns
// for test, only support one thread
static struct timespec start_time, end_time;
-
-// return ns
-static inline unsigned long long bm_time_diff(const struct timespec *start_time, const struct timespec *end_time)
-{
- if (start_time->tv_sec == end_time->tv_sec)
- {
- return (unsigned long long)(end_time->tv_nsec - start_time->tv_nsec);
- }
- return ((unsigned long long)end_time->tv_sec * 1000 * 1000 * 1000 + end_time->tv_nsec) - ((unsigned long long)start_time->tv_sec * 1000 *1000 *1000 + start_time->tv_nsec);
-}
+unsigned long long time_diff; //ns
+
+#define TIME_UPDATE() \
+ clock_gettime(CLOCK_REALTIME, &start_time); \
+ g_current_time_tv.tv_sec = start_time.tv_sec; \
+ g_current_time_tv.tv_usec = start_time.tv_nsec / 1000; \
+ g_current_time_in_ms = start_time.tv_sec * 1000 + start_time.tv_nsec / 1000000;
+
+#define TIME_DIFF() \
+ do { \
+ clock_gettime(CLOCK_REALTIME, &end_time); \
+ if (likely(end_time.tv_sec == start_time.tv_sec))\
+ {\
+ time_diff = (end_time.tv_nsec - start_time.tv_nsec);\
+ }else{\
+ time_diff = ((unsigned long long)end_time.tv_sec * 1000 * 1000 * 1000 + end_time.tv_nsec) - ((unsigned long long)start_time.tv_sec * 1000 * 1000 * 1000 + start_time.tv_nsec);\
+ }\
+ }while (0)
static void bm_add_item(void *bloom_filter, char *tuple4_buf, unsigned int index, const sapp_dup_pkt_t *dup_conf)
{
bm_update_key(tuple4_buf, index);
- clock_gettime(CLOCK_REALTIME, &start_time);
- g_current_time_tv.tv_sec = g_current_time_sp.tv_sec;
- g_current_time_tv.tv_usec = g_current_time_sp.tv_nsec / 1000;
+ TIME_UPDATE();
bloom_add(bloom_filter, tuple4_buf, TUPLE4_ADDR_LEN, dup_conf, g_current_time_tv, g_current_time_in_ms);
clock_gettime(CLOCK_REALTIME, &end_time);
-
- unsigned long long time_diff = bm_time_diff(&start_time, &end_time);
- if (time_diff > max_add_time)
- {
- max_add_time = time_diff;
+ TIME_DIFF();
+ if(time_diff > max_add_time){
+ max_add_time = (unsigned long long)time_diff;
}
total_add_time += (unsigned long long)time_diff;
fieldstat_easy_histogram_record(fs4_instance, 0, fs4_add_metric_id, &FS4_HISGRAM_TAG, 1, time_diff);
@@ -121,16 +124,12 @@ static void bm_add_item(void *bloom_filter, char *tuple4_buf, unsigned int index static int bm_search_item(void *bloom_filter, char *tuple4_buf, unsigned int index, const sapp_dup_pkt_t *dup_conf, int fs4_metric_id)
{
bm_update_key(tuple4_buf, index);
- clock_gettime(CLOCK_REALTIME, &start_time);
- g_current_time_tv.tv_sec = g_current_time_sp.tv_sec;
- g_current_time_tv.tv_usec = g_current_time_sp.tv_nsec / 1000;
+ TIME_UPDATE();
int ret = bloom_check(bloom_filter, tuple4_buf, TUPLE4_ADDR_LEN, dup_conf, g_current_time_tv, g_current_time_in_ms);
- clock_gettime(CLOCK_REALTIME, &end_time);
- long long time_diff = bm_time_diff(&start_time, &end_time);
- if (time_diff > max_search_time)
- {
- max_search_time = time_diff;
- }
+ TIME_DIFF();
+ if(time_diff > max_search_time){
+ max_search_time = (unsigned long long)time_diff;
+ }
total_search_time += (unsigned long long)time_diff;
fieldstat_easy_histogram_record(fs4_instance, 0, fs4_metric_id, &FS4_HISGRAM_TAG, 1, time_diff);
return ret;
@@ -171,6 +170,7 @@ static int bm_test() dup_conf.dup_pkt_distinguish_ipv4_udp = 1;
dup_conf.bloom_partition_num = BM_PARTITION_NUM;
dup_conf.transition_time_ms = BM_TRANSITION_TIME;
+ dup_conf.bloom_slice_num = APBM_SLICE_NUM;
fs4_instance = fieldstat_easy_new(1, "bm-test", NULL, 0);
fieldstat_easy_enable_auto_output(fs4_instance, "./bm_gtest_fs4.json", 1);
@@ -182,6 +182,11 @@ static int bm_test() fs4_slice_new_metric_id = fieldstat_easy_register_counter(fs4_instance, "bm_slice_new");
fs4_slice_free_metric_id = fieldstat_easy_register_counter(fs4_instance, "bm_slice_free");
+ clock_gettime(CLOCK_REALTIME, &start_time);
+ g_current_time_tv.tv_sec = start_time.tv_sec;
+ g_current_time_tv.tv_usec = start_time.tv_nsec / 1000;
+ g_current_time_in_ms = start_time.tv_sec * 1000 + start_time.tv_nsec / 1000000;
+
void *dabm_handle = bloom_new(&dup_conf, g_current_time_tv, g_current_time_in_ms);
assert(dabm_handle);
char tuple4_buf[1024] = {};
@@ -206,7 +211,7 @@ static int bm_test() BM_CAPACITY, BM_TIMEOUT, BM_PARTITION_NUM, APBM_SLICE_NUM, ITEM_BATCH_NUM, MAX_ITEM_NUM);
int ret;
int add_index = 0, search_index_y = 0, search_index_n = 0;
- while (add_index < MAX_ITEM_NUM || search_index_y < MAX_ITEM_NUM || search_index_n < MAX_ITEM_NUM)
+ while (add_index < MAX_ITEM_NUM || search_index_y < MAX_ITEM_NUM || search_index_n < MAX_ITEM_NUM )
{
tuple4_buf[0] = 'Y';
for (int b = 0; b < ITEM_BATCH_NUM && add_index < MAX_ITEM_NUM; b++, add_index++)
@@ -263,30 +268,12 @@ static void *bm_test_thread(void *arg) return (void *)"success";
}
-static inline long long get_curtime_in_us(void)
-{
- struct timespec curts;
- clock_gettime(CLOCK_MONOTONIC, &curts);
- return curts.tv_sec * 1000000 + curts.tv_nsec / 1000;
-}
-
static inline void nssleep(long ns)
{
struct timespec req = {0, ns};
nanosleep(&req, NULL);
}
-static void *timer_thread(void *arg)
-{
- while (timer_thread_run)
- {
- clock_gettime(CLOCK_REALTIME, (struct timespec *)&g_current_time_sp);
- g_current_time_in_ms = g_current_time_sp.tv_sec * 1000 + g_current_time_sp.tv_nsec / 1000000;
- nssleep(10);
- }
- return NULL;
-}
-
static void bm_test_usage(void)
{
printf("BM test usage:\n");
@@ -329,7 +316,7 @@ static int convert_user_args(const char *user_args) if (strcmp(key, "capacity") == 0)
{
BM_CAPACITY = (unsigned int )strtoull(value, NULL, 10);
- }
+ }
else if (strcmp(key, "error_rate") == 0)
{
BM_ERROR_RATE = strtod(value, NULL);
@@ -372,8 +359,6 @@ int sapp_bloom_filter_test_run(int bm_liarary, const char *user_define_args) BM_LIBRARY = bm_liarary;
convert_user_args(user_define_args);
- pthread_create(&timer_thread_id, NULL, timer_thread, NULL);
-
for (int i = 0; i < BM_TEST_MAX_THREAD; i++)
{
pthread_create(&bm_test_thread_id[i], NULL, bm_test_thread, NULL);
@@ -389,9 +374,79 @@ int sapp_bloom_filter_test_run(int bm_liarary, const char *user_define_args) ret = -1;
}
}
- timer_thread_run = 0;
- pthread_cancel(timer_thread_id);
- pthread_join(timer_thread_id, NULL);
return ret;
}
+
+int sapp_apbloom_merge_test_run(void)
+{
+ int ret = 0;
+ const int max_item_num = 100000;
+ sapp_dup_pkt_t dup_conf = {};
+ dup_conf.bloom_capacity = 1000000;
+ dup_conf.bloom_error_rate = 0.000001;
+ dup_conf.bloom_timeout_ms = 30*1000;
+ dup_conf.bloom_library = (enum bloom_library)BLOOM_LIBRARY_APBLOOM;
+ dup_conf.kickout_udp_stream_enabled = 1;
+ dup_conf.dup_pkt_distinguish_all_inject = 1;
+ dup_conf.dup_pkt_distinguish_ipv4_tcp = 1;
+ dup_conf.dup_pkt_distinguish_ipv4_udp = 1;
+ dup_conf.bloom_slice_num = 1;
+
+ TIME_UPDATE();
+
+ struct AP_bloom *apbma = (struct AP_bloom *)bloom_new(&dup_conf, g_current_time_tv, g_current_time_in_ms);
+ assert(apbma);
+ char tuple4_buf[1024] = {};
+
+ tuple4_buf[0] = 'A';
+ for(int i = 0; i < max_item_num; i++){
+ bm_update_key(tuple4_buf, i);
+ bloom_add(apbma, tuple4_buf, TUPLE4_ADDR_LEN, &dup_conf, g_current_time_tv, g_current_time_in_ms);
+ }
+
+ struct AP_bloom *apbmb = (struct AP_bloom *)bloom_new(&dup_conf, g_current_time_tv, g_current_time_in_ms);
+ assert(apbmb);
+
+ tuple4_buf[0] = 'B';
+ for(int i = 0; i < max_item_num; i++){
+ TIME_UPDATE();
+ bm_update_key(tuple4_buf, i);
+ bloom_add(apbmb, tuple4_buf, TUPLE4_ADDR_LEN, &dup_conf, g_current_time_tv, g_current_time_in_ms);
+ }
+
+ tuple4_buf[0] = 'B';
+ for(int i = 0; i < max_item_num; i++){
+ TIME_UPDATE();
+ bm_update_key(tuple4_buf, i);
+ ret = bloom_check(apbma, tuple4_buf, TUPLE4_ADDR_LEN, &dup_conf, g_current_time_tv, g_current_time_in_ms);
+ if(ret > 0){
+ fprintf(stderr, "found tuple4 with B apbma before merge!\n");
+ return -1;
+ }
+ }
+
+ AP_bloom_merge(apbma, apbmb);
+
+ for(int i = 0; i < max_item_num; i++){
+ TIME_UPDATE();
+
+ tuple4_buf[0] = 'A';
+ bm_update_key(tuple4_buf, i);
+ ret = bloom_check(apbma, tuple4_buf, TUPLE4_ADDR_LEN, &dup_conf, g_current_time_tv, g_current_time_in_ms);
+ if(ret <= 0){
+ fprintf(stderr, "not found tuple4 with A!\n");
+ return -1;
+ }
+ tuple4_buf[0] = 'B';
+ bm_update_key(tuple4_buf, i);
+ ret = bloom_check(apbma, tuple4_buf, TUPLE4_ADDR_LEN, &dup_conf, g_current_time_tv, g_current_time_in_ms);
+ if(ret <= 0){
+ fprintf(stderr, "not found tuple4 with B!\n");
+ return -1;
+ }
+ }
+ AP_bloom_free(apbma);
+ AP_bloom_free(apbmb);
+ return 0;
+}
diff --git a/module_test/src/gtest_sapp_fun.h b/module_test/src/gtest_sapp_fun.h index 473eb6d..f597a0e 100644 --- a/module_test/src/gtest_sapp_fun.h +++ b/module_test/src/gtest_sapp_fun.h @@ -570,6 +570,7 @@ void sapp_deal_proxy_kill_tcp_run(void); int test_pkt_dump_run(void); int sapp_bloom_filter_test_run(int bm_lib, const char *args); +int sapp_apbloom_merge_test_run(void); void append_entry_list(const char *entryname); int check_sapp_version(void); diff --git a/src/support/ap_bloom/CMakeLists.txt b/src/support/ap_bloom/CMakeLists.txt index e89f0ff..c6c0424 100644 --- a/src/support/ap_bloom/CMakeLists.txt +++ b/src/support/ap_bloom/CMakeLists.txt @@ -3,4 +3,4 @@ cmake_minimum_required(VERSION 2.8...3.10) add_definitions(-fPIC) include_directories(${CMAKE_SOURCE_DIR}/src/support/ap_bloom/deps) -add_library(ap_bloom STATIC deps/xxhash.c deps/crdt_utils.c src/ap_bloom.c)
\ No newline at end of file +add_library(ap_bloom STATIC deps/xxhash.c deps/crdt_utils.c src/ap_bloom.c deps/ap_mempool.cpp)
\ No newline at end of file diff --git a/src/support/ap_bloom/deps/ap_mempool.cpp b/src/support/ap_bloom/deps/ap_mempool.cpp new file mode 100644 index 0000000..28f94a0 --- /dev/null +++ b/src/support/ap_bloom/deps/ap_mempool.cpp @@ -0,0 +1,157 @@ +#include <stdint.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <time.h> +#include <assert.h> +#include <pthread.h> +#include <sys/mman.h> +#include "ap_mempool.h" + +#ifndef MAX +#define MAX(a, b) (((a) > (b)) ? (a) : (b)) +#endif +#ifndef MIN +#define MIN(a, b) (((a) < (b)) ? (a) : (b)) +#endif + +struct apbm_block_info{ + void *block_memory_head; + void *block_memory_tail; + void **block_heads; + int *block_inused_flags; + unsigned int block_size; + unsigned int block_num; + int ues_hugepage; + int to_flush_block_num; +}; +struct apbm_mempool{ + struct apbm_block_info large; + struct apbm_block_info small; +}; +enum mempool_flag{ + MEMPOOL_USABLE = 0, + MEMPOOL_INUSED = 1, +}; + +static int apbm_mempool_block_init(struct apbm_block_info *block, size_t block_size, int block_num) +{ + block->block_size = block_size; + block->block_num = block_num; +#if USE_HUGE_PAGE + block->block_memory_head = mmap(NULL, block_num * block_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS | MAP_HUGETLB, -1, 0); + if(MAP_FAILED == block->block_memory_head){ + perror("dabm_mempool_init() mmap huge-page error, use glibc: "); + posix_memalign((void **)&block->block_memory_head, 64, block->block_num * block->block_size); + block->ues_hugepage = 0; + }else{ + block->ues_hugepage = 1; + } +#else + posix_memalign((void **)&block->block_memory_head, MEM_ALIGN_SIZE(), block->block_num * block->block_size); +#endif + block->block_memory_tail = (char *)block->block_memory_head + block->block_num * block->block_size; + memset(block->block_memory_head, 0, block->block_num * block->block_size); + block->block_heads = (void **)calloc(block->block_num, sizeof(void *)); + block->block_inused_flags = (int *)calloc(block->block_num, sizeof(int)); + for(int i=0; i<block->block_num; i++) + { + block->block_heads[i] = (char *)block->block_memory_head + i * block->block_size; + block->block_inused_flags[i] = 0; + } + return 0; +} +static void apbm_mp_block_free(struct apbm_block_info *block, void *p) +{ + for (int i = 0; i < block->block_num; i++) + { + if((block->block_heads[i] == p) + && (block->block_inused_flags[i] == MEMPOOL_INUSED)){ + memset(p, 0, block->block_size); //memset in free() + block->block_inused_flags[i] = MEMPOOL_USABLE; + return; + } + } + assert(0); +} +static void apbm_mp_block_destroy(struct apbm_block_info *block) +{ + if(block->block_memory_head){ + if(block->ues_hugepage){ + munmap(block->block_memory_head, block->block_num * block->block_size); + }else{ + free(block->block_memory_head); + } + block->block_memory_head = NULL; + } + if(block->block_heads){ + free(block->block_heads); + block->block_heads = NULL; + } + if(block->block_inused_flags){ + free(block->block_inused_flags); + block->block_inused_flags = NULL; + } + block->block_num = 0; + block->block_size = 0; +} +static void *apbm_mp_block_alloc(struct apbm_block_info *block, int size) +{ + for (int i = 0; i < block->block_num; i++){ + if(block->block_inused_flags[i] == MEMPOOL_USABLE){ + block->block_inused_flags[i] = MEMPOOL_INUSED; + return block->block_heads[i]; + } + } + return NULL; +} + +#ifdef __cplusplus +extern "C" +{ +#endif +void *apbm_mempool_alloc(struct apbm_mempool *mp, int size) +{ + void *res = NULL; + if(size <= mp->small.block_size){ + res = apbm_mp_block_alloc(&mp->small, size); + }else if(size <= mp->large.block_size){ + res = apbm_mp_block_alloc(&mp->large, size); + } + if(NULL == res){ + //mempool full or size unmatch, alloc use glibc + res = calloc(size, sizeof(char)); + } + return res; +} +void apbm_mempool_free(struct apbm_mempool *mp, void *p) +{ + if((unsigned long long)p >= (unsigned long long)mp->small.block_memory_head + && (unsigned long long)p <= (unsigned long long)mp->small.block_memory_tail){ + return apbm_mp_block_free(&mp->small, p); + } + if((unsigned long long)p >= (unsigned long long)mp->large.block_memory_head + && (unsigned long long)p <= (unsigned long long)mp->large.block_memory_tail){ + return apbm_mp_block_free(&mp->large, p); + } + //not allocated from mempool + free(p); +} +struct apbm_mempool *apbm_mempool_new(size_t large_block_size, int large_block_num, size_t small_block_size, int small_block_num) +{ + struct apbm_mempool *mp = (struct apbm_mempool *)calloc(1, sizeof(struct apbm_mempool)); + memset(mp, 0, sizeof(struct apbm_mempool)); + apbm_mempool_block_init(&mp->large, large_block_size, large_block_num); + apbm_mempool_block_init(&mp->small, small_block_size, small_block_num); + return mp; +} +void apbm_mempool_destroy(struct apbm_mempool *mp) +{ + apbm_mp_block_destroy(&mp->large); + apbm_mp_block_destroy(&mp->small); + free(mp); +} +#ifdef __cplusplus +} +#endif
\ No newline at end of file diff --git a/src/support/ap_bloom/deps/ap_mempool.h b/src/support/ap_bloom/deps/ap_mempool.h new file mode 100644 index 0000000..73bc94a --- /dev/null +++ b/src/support/ap_bloom/deps/ap_mempool.h @@ -0,0 +1,28 @@ +#ifndef _AP_MEMPOOL_H_ +#define _AP_MEMPOOL_H_ 1 +#include <stdint.h> +#include <unistd.h> +#ifdef __cplusplus +extern "C" +{ +#endif +#define USE_HUGE_PAGE (0) //precondition : sysctl -w vm.nr_hugepages=128,256,512... or other number by your needs +#define MEM_POOL_INIT_BLOCK_NUM (24) +#define MEM_ALIGN_SIZE() (getpagesize() > 4096 ? 4096: getpagesize()) +struct apbm_mempool; +/* + * FYI: + * if error_rate set to 0.000001, slice num is 20 + * if capacity set to 1000000, slice size is 180344 + * if capacity set to 5000000, slice size is 901688 + * if capacity set to 10000000, slice size is 1803376 + */ +struct apbm_mempool *apbm_mempool_new(size_t large_block_size, int large_block_num, size_t small_block_size, int small_block_num); +void apbm_mempool_destroy(struct apbm_mempool *mp); +void *apbm_mempool_alloc(struct apbm_mempool *mp, int size); +void apbm_mempool_free(struct apbm_mempool *mp, void *p); +#ifdef __cplusplus +} +#endif + +#endif
\ No newline at end of file diff --git a/src/support/ap_bloom/deps/crdt_utils.c b/src/support/ap_bloom/deps/crdt_utils.c index 012896c..ab93224 100644 --- a/src/support/ap_bloom/deps/crdt_utils.c +++ b/src/support/ap_bloom/deps/crdt_utils.c @@ -1,12 +1,2 @@ #include "crdt_utils.h" -#include "xxhash.h" - -void double_hash_init(struct double_hash *rv, const void *buffer, int len) -{ - // rv->a=XXH3_64bits_withSeed(buffer, len, 0x9747b28c); - // rv->b=XXH3_64bits_withSeed(buffer, len, rv->a); - XXH128_hash_t xxh128b = XXH3_128bits_withSeed(buffer, len, 0x9747b28c); - rv->a = xxh128b.high64; - rv->b = xxh128b.low64; - return; -}
\ No newline at end of file +#include "xxhash.h"
\ No newline at end of file diff --git a/src/support/ap_bloom/deps/crdt_utils.h b/src/support/ap_bloom/deps/crdt_utils.h index c7eac90..eaee851 100644 --- a/src/support/ap_bloom/deps/crdt_utils.h +++ b/src/support/ap_bloom/deps/crdt_utils.h @@ -5,7 +5,16 @@ #include <stdint.h> //uint64_t #define ALLOC(type, number) ((type *)calloc(sizeof(type), number)) -#define FREE(p) {free(*p);*p=NULL;} +#define FREE(p) {free(p); p=NULL;} + +#define APBM_USE_MEMPOOL 1 +#if APBM_USE_MEMPOOL +#define APMOOL_ALLOC(mp, type, number) ((type *)apbm_mempool_alloc(mp, sizeof(type)*(number))) +#define APMOOL_FREE(mp, p) {apbm_mempool_free(mp, p); p=NULL;} +#else +#define APMOOL_ALLOC(mp, type, number) ALLOC(type, number) +#define APMOOL_FREE(mp, p) FREE(p) +#endif #define TRUE 1 #define FALSE 0 @@ -34,19 +43,54 @@ #define unlikely(x) __builtin_expect((x),0) +#ifdef __cplusplus +extern "C" +{ +#endif // Double hashing for fast hashing, // reference: Kirsch, Adam, and Michael Mitzenmacher. "Less hashing, same performance: Building a better bloom filter." -// Algorithms–ESA 2006: 14th Annual European Symposium, Zurich, Switzerland, September 11-13, 2006. Proceedings 14. Springer Berlin Heidelberg, 2006. +// Algorithms�CESA 2006: 14th Annual European Symposium, Zurich, Switzerland, September 11-13, 2006. Proceedings 14. Springer Berlin Heidelberg, 2006. // https://www.eecs.harvard.edu/~michaelm/postscripts/rsa2008.pdf struct double_hash { uint64_t a; uint64_t b; }; -void double_hash_init(struct double_hash *rv, const void *buffer, int len); +// void double_hash_init(struct double_hash *rv, const void *buffer, size_t len); // int double_hash_generate(const struct double_hash *rv, int i, int m); -static inline int double_hash_generate(const struct double_hash *rv, int i, int m) +#define double_hash_init(rv, buffer, len) \ + XXH128_hash_t xxh128b = XXH3_128bits_withSeed(buffer, len, 0x9747b28c); \ + rv.a = xxh128b.high64; \ + rv.b = xxh128b.low64; + +static inline unsigned int double_hash_generate(const struct double_hash *rv, unsigned int i, unsigned int m) { return (rv->a + i * rv->b) % m; -}
\ No newline at end of file +} + +#define TIME_MEASURE 0 //to observe delay jitter +#if TIME_MEASURE +#define MAX_DELAY (100000) //ns +#define TIME_RECORD() struct timespec _start_time, _end_time; long long time_diff_ns; clock_gettime(CLOCK_REALTIME, &_start_time) +#define TIME_DIFF() \ + do { \ + clock_gettime(CLOCK_REALTIME, &_end_time); \ + if (likely(_end_time.tv_sec == _start_time.tv_sec))\ + {\ + time_diff_ns = (_end_time.tv_nsec - _start_time.tv_nsec);\ + }else{\ + time_diff_ns = ((long long)_end_time.tv_sec * 1000 * 1000 * 1000 + _end_time.tv_nsec) - ((long long)_start_time.tv_sec * 1000 * 1000 * 1000 + _start_time.tv_nsec);\ + }\ + if(unlikely(time_diff_ns > MAX_DELAY)){\ + fprintf(stderr, "%s:%d, timestamp:%lld.%lld, time diff:%ld ns\n", __FILE__, __LINE__, (long long)_start_time.tv_sec, (long long)_start_time.tv_nsec, time_diff_ns);\ + }\ + }while (0) +#else +#define TIME_RECORD() +#define TIME_DIFF() +#endif + +#ifdef __cplusplus +} +#endif
\ No newline at end of file diff --git a/src/support/ap_bloom/src/ap_bloom.c b/src/support/ap_bloom/src/ap_bloom.c index 9cd1f9f..df6a0a8 100644 --- a/src/support/ap_bloom/src/ap_bloom.c +++ b/src/support/ap_bloom/src/ap_bloom.c @@ -2,7 +2,8 @@ #include "crdt_utils.h" #include "utlist.h" #include "utarray.h" - +#include "ap_mempool.h" +#include "xxhash.h" #include <math.h> // log, ceil #include <stdio.h> // printf #include <assert.h> // assert @@ -19,7 +20,6 @@ #define FILL_RATIO_THRESHOLD 0.5 #define MIN_CAPACITY 1000 - //The test_bit_set_bit() code snap is from RedisBloom (BSD License). //Source https://github.com/RedisBloom/RedisBloom/blob/master/deps/bloom/bloom.c #define MODE_READ 0 @@ -56,29 +56,32 @@ struct AP_bloom struct ap_slice **heads; //hash_num+timeframe_num struct AP_bloom_stat stat; + struct apbm_mempool *mp; }; //return 1 if the bit is already set, 0 if it was not set -inline static int test_bit_set_bit(unsigned char *buf, uint64_t x, int mode) { - uint64_t byte = x >> 3; - uint8_t mask = 1 << (x % 8); - uint8_t c = buf[byte]; // expensive memory access - +inline static int set_bit(unsigned char *buf, uint32_t bit_index, uint32_t byte_index) { + uint8_t mask = 1 << (bit_index % 8); + uint8_t c = buf[byte_index]; // expensive memory access, maybe trigger page-fault events if (c & mask) { return 1; } else { - if (mode == MODE_WRITE) { - buf[byte] = c | mask; - } + buf[byte_index] = c | mask; return 0; } } +inline static int test_bit(const unsigned char *buf, uint32_t bit_index, uint32_t byte_index) { + uint8_t mask = 1 << (bit_index % 8); + uint8_t c = buf[byte_index]; // expensive memory access, maybe trigger page-fault events + return c & mask; +} + struct ap_slice { - int hash_index; + unsigned int hash_index; int popcount; //the number of 1-bits in the slice - int slice_size; //in bytes + unsigned int slice_size; //in bytes struct timeval last_insert; struct timeval first_insert; unsigned char * data; @@ -86,12 +89,13 @@ struct ap_slice }; static struct ap_slice *ap_slice_new(struct AP_bloom *bloom, int slice_size, int hash_index) { - struct ap_slice *slice = ALLOC(struct ap_slice, 1); + struct ap_slice *slice = APMOOL_ALLOC(bloom->mp, struct ap_slice, 1); slice->slice_size = slice_size; assert(slice_size % sizeof(unsigned long long) == 0); //ap_slice_chain_deserialize may alloc slice_size=0 if(slice_size > 0){ - slice->data = ALLOC(unsigned char, slice_size); + slice->data = APMOOL_ALLOC(bloom->mp, unsigned char, slice_size); + memset(slice->data, 0, slice_size); } slice->hash_index = hash_index; slice->popcount = 0; @@ -102,9 +106,9 @@ static struct ap_slice *ap_slice_new(struct AP_bloom *bloom, int slice_size, int } static void ap_slice_free(struct AP_bloom *bloom, struct ap_slice *slice) { - free(slice->data); - slice->data = NULL; - free(slice); + APMOOL_FREE(bloom->mp, slice->data); + // slice->data = NULL; + APMOOL_FREE(bloom->mp, slice); bloom->stat.slice_free_num_acc++; bloom->stat.slice_num_rt--; } @@ -155,126 +159,43 @@ static struct ap_slice * ap_slice_duplicate(struct AP_bloom *bloom, const struct #define HASH_TO_OFFSET(hash, slice) ((hash.a + slice->hash_index * hash.b) % (slice->slice_size<<3)) void ap_slice_add_hash(struct ap_slice *slice, const struct double_hash *hash, struct timeval now) { - int index=double_hash_generate(hash, slice->hash_index, slice->slice_size<<3); - __builtin_prefetch(&slice->data[index>>3], 1); - int added = !test_bit_set_bit(slice->data, index, MODE_WRITE); + unsigned int bit_index=double_hash_generate(hash, slice->hash_index, slice->slice_size<<3); + unsigned int byte_index = bit_index>>3; + __builtin_prefetch(&slice->data[byte_index], 1); + int added = !set_bit(slice->data, bit_index, byte_index); + slice->popcount += added; slice->last_insert = now; - if(slice->popcount == 0) + if(unlikely(slice->popcount == 0)) { slice->first_insert = now; } - slice->popcount += added; return; } int ap_slice_check_hash(const struct ap_slice *slice, const struct double_hash *hash) { - int index=double_hash_generate(hash, slice->hash_index, slice->slice_size<<3); - __builtin_prefetch(&slice->data[index>>3], 0); - return test_bit_set_bit(slice->data, index, MODE_READ); + unsigned int bit_index=double_hash_generate(hash, slice->hash_index, slice->slice_size<<3); + unsigned int byte_index = bit_index>>3; + __builtin_prefetch(&slice->data[byte_index], 0); + return test_bit(slice->data, bit_index, byte_index); } -struct ap_slice_event -{ - struct timeval timestamp; - int slice_size; - int hash_index; - int event;//1: start, -1: end -}; -int slice_event_cmp(const void *a, const void *b) -{ - const struct ap_slice_event *ea = (struct ap_slice_event*)a; - const struct ap_slice_event *eb = (struct ap_slice_event*)b; - if(timercmp(&ea->timestamp, &eb->timestamp, <)) - { - return -1; - } - if(timercmp(&ea->timestamp, &eb->timestamp, >)) - { - return 1; - } - return eb->event - ea->event; -} -UT_icd slice_event_icd = {sizeof(struct ap_slice_event), NULL, NULL, NULL}; -struct ap_state -{ - int consecutive_matches; - int visited_slices; - int hash_num; - UT_array slice_time_events; -}; -static inline void ap_state_init(struct ap_state *state, int hash_num) -{ - state->consecutive_matches=0; - state->visited_slices=0; - state->hash_num=hash_num; - utarray_init(&state->slice_time_events, &slice_event_icd); - utarray_reserve(&state->slice_time_events, hash_num*2); -} -static inline void ap_state_clear(struct ap_state *state) -{ - state->consecutive_matches=0; - state->visited_slices=0; - utarray_clear(&state->slice_time_events); -} -static inline void ap_state_done(struct ap_state *state) -{ - utarray_done(&state->slice_time_events); -} -int ap_state_is_match(struct ap_state *state) -{ - int counter=0; - struct ap_slice_event *ev=NULL; - if(state->consecutive_matches >= state->hash_num) - { - if(state->visited_slices==state->consecutive_matches)//fastpath for no slice expansion - { - return 1; - } - utarray_sort(&state->slice_time_events, slice_event_cmp); - while((ev=(struct ap_slice_event*)utarray_next(&state->slice_time_events, ev))) - { - counter += ev->event; - if(counter == state->hash_num) - { - return 1; - } - } - } - return 0; -} -static void ap_slice_chain_check_hash(const struct ap_slice *head, const struct double_hash *hash, struct ap_state *state) + +static int ap_slice_chain_check_hash(const struct ap_slice *head, const struct double_hash *hash) { + TIME_RECORD(); //In a stackable (scalable) Bloom filter, checking for membership now involves inspecting each layer for presence. const struct ap_slice *slice=NULL; - struct ap_slice_event ev; int found=0; LL_FOREACH(head, slice) { if(ap_slice_check_hash(slice, hash)) { - ev.timestamp = slice->first_insert; - ev.event = 1; - ev.slice_size=slice->slice_size; - ev.hash_index=slice->hash_index; - utarray_push_back(&state->slice_time_events, &ev); - ev.timestamp = slice->last_insert; - ev.event = -1; - ev.slice_size = slice->slice_size; - ev.hash_index = slice->hash_index; - utarray_push_back(&state->slice_time_events, &ev); found=1; + break; //quick path } - state->visited_slices++; - } - if(found) - { - state->consecutive_matches++; } - else - { - ap_state_clear(state); - } - return; + TIME_DIFF(); + return found; } /* Return: @@ -503,20 +424,26 @@ struct AP_bloom *AP_bloom_new(struct timeval now, double error_rate, long long c bloom->cfg.last_cfg = now; bloom->cfg.time_slice_num = time_slice_num; - bloom->last_slide = now; bloom->cursor = 0; bloom->default_slice_size = ceil(capacity/log(2)); bloom->default_slice_size = ceil((double)bloom->default_slice_size/64)*64/8; + if(bloom->default_slice_size % MEM_ALIGN_SIZE() != 0){ + bloom->default_slice_size = (bloom->default_slice_size / MEM_ALIGN_SIZE() + 1) * MEM_ALIGN_SIZE(); + } + assert(0 == bloom->default_slice_size % MEM_ALIGN_SIZE()); +#if APBM_USE_MEMPOOL + bloom->mp = apbm_mempool_new(bloom->default_slice_size, MEM_POOL_INIT_BLOCK_NUM, 128, MEM_POOL_INIT_BLOCK_NUM); +#endif bloom->chain_num = bloom->hash_num + bloom->cfg.time_slice_num; - bloom->heads = ALLOC(struct ap_slice*, bloom->chain_num); + bloom->heads = APMOOL_ALLOC(bloom->mp, struct ap_slice*, bloom->chain_num); for(int i=0; i<bloom->chain_num; i++) { bloom->heads[i] = ap_slice_new(bloom, bloom->default_slice_size, i % bloom->chain_num); } bloom->stat.expand_max_multiple_rt = 1; bloom->stat.slice_num_rt = bloom->chain_num; - + bloom->last_slide = now; return bloom; } @@ -530,18 +457,20 @@ static void slide_time(struct AP_bloom *bloom, struct timeval now) { return; } + TIME_RECORD(); long long elapse_us=timeval_delta_us(bloom->cfg.last_cfg, now); long long epoches=elapse_us/slide_time_frame_us; slide_us -= slide_us % slide_time_frame_us; int n_slide = slide_us/slide_time_frame_us; - + int reset_hash_index; if(n_slide < bloom->cfg.time_slice_num) { for(int i=0; i<n_slide; i++) { int reset_idx = (bloom->cursor + bloom->hash_num) % chain_num; - struct ap_slice *slice = ap_slice_new(bloom, bloom->default_slice_size, bloom->heads[reset_idx]->hash_index); + reset_hash_index = bloom->heads[reset_idx]->hash_index; ap_slice_chain_free(bloom, bloom->heads[reset_idx]); + struct ap_slice *slice = ap_slice_new(bloom, bloom->default_slice_size, reset_hash_index); bloom->heads[reset_idx] = slice; bloom->cursor = (bloom->cursor + 1) % chain_num; } @@ -550,26 +479,30 @@ static void slide_time(struct AP_bloom *bloom, struct timeval now) { for(int i=0; i<chain_num; i++) { - struct ap_slice *new_slice=ap_slice_new(bloom, bloom->default_slice_size, bloom->heads[i]->hash_index); + reset_hash_index = bloom->heads[i]->hash_index; ap_slice_chain_free(bloom, bloom->heads[i]); + struct ap_slice *new_slice=ap_slice_new(bloom, bloom->default_slice_size, reset_hash_index); bloom->heads[i] = new_slice; } bloom->cursor = (bloom->cursor + n_slide) % chain_num; } + struct timeval slide_time; slide_time.tv_sec = slide_us/1000/1000; slide_time.tv_usec = slide_us%(1000*1000); timeradd(&bloom->last_slide, &slide_time, &bloom->last_slide); assert(bloom->cursor == (epoches % chain_num) || bloom->cursor == (epoches % chain_num + 1) % chain_num); + TIME_DIFF(); return; } void AP_bloom_add(struct AP_bloom *bloom, struct timeval now, const char *buffer, int len) { - struct double_hash hash; - double_hash_init(&hash, buffer, len); - if(bloom->cfg.time_window_ms) + TIME_RECORD(); + struct double_hash hash; + double_hash_init(hash, buffer, (size_t)len); + if(bloom->cfg.time_slice_num && bloom->cfg.time_window_ms) { slide_time(bloom, now); } @@ -578,6 +511,7 @@ void AP_bloom_add(struct AP_bloom *bloom, struct timeval now, const char *buffer int idx=(bloom->cursor+i) % (bloom->chain_num); ap_slice_chain_add_hash(bloom,&(bloom->heads[idx]), &hash, now); } + TIME_DIFF(); return; } @@ -587,29 +521,34 @@ int AP_bloom_check(const struct AP_bloom *bloom, struct timeval now, const char { return 0; } + TIME_RECORD(); struct double_hash hash; - double_hash_init(&hash, buffer, len); + double_hash_init(hash, buffer, (size_t)len); int chain_num = bloom->chain_num; - - struct ap_state state; - ap_state_init(&state, bloom->hash_num); - // int max_num = bloom->hash_num+chain_num; + int match_num = 0, unmatch_num = 0; for(int i = 0; i < chain_num; i++) { - long long delta_us = timeval_delta_us(bloom->heads[i%chain_num]->last_insert, now); - if(unlikely((delta_us > bloom->cfg.time_window_ms*1000) && bloom->cfg.time_window_ms)) + long long delta_us = timeval_delta_us(bloom->heads[i]->last_insert, now); + if(unlikely((delta_us > bloom->cfg.time_window_ms*1000) + && bloom->cfg.time_window_ms)) { - ap_state_clear(&state); continue; } - ap_slice_chain_check_hash(bloom->heads[i%chain_num], &hash, &state); - if(ap_state_is_match(&state)) - { - ap_state_done(&state); - return 1; + if(ap_slice_chain_check_hash(bloom->heads[i], &hash)){ + match_num++; + }else{ + unmatch_num++; + } + if(unmatch_num > (chain_num - bloom->hash_num)){ + return 0; //quick path for check false } } - ap_state_done(&state); + if(match_num >= bloom->hash_num) + { + TIME_DIFF(); + return 1; + } + TIME_DIFF(); return 0; } void AP_bloom_free(struct AP_bloom *bloom) @@ -619,8 +558,11 @@ void AP_bloom_free(struct AP_bloom *bloom) ap_slice_chain_free(bloom, bloom->heads[i]); bloom->heads[i]=NULL; } - free(bloom->heads); - bloom->heads=NULL; + APMOOL_FREE(bloom->mp, bloom->heads); + +#if APBM_USE_MEMPOOL + apbm_mempool_destroy(bloom->mp); +#endif free(bloom); } size_t AP_bloom_mem_size(const struct AP_bloom *bloom) @@ -672,7 +614,7 @@ void AP_bloom_serialize(const struct AP_bloom *bloom, char **blob, size_t *blob_ } struct AP_bloom * AP_bloom_deserialize(const char *blob, size_t blob_sz) { - struct AP_bloom *bloom=ALLOC(struct AP_bloom, 1); + struct AP_bloom *bloom=ALLOC(struct AP_bloom, 1); size_t offset=0; memcpy(bloom, blob+offset, offsetof(struct AP_bloom, heads)); offset += offsetof(struct AP_bloom, heads); |
