diff options
| author | lijia <[email protected]> | 2024-05-12 18:46:57 +0800 |
|---|---|---|
| committer | yangwei <[email protected]> | 2024-05-17 14:10:21 +0800 |
| commit | 8b783b250ebcbc0ca2dae3393ad8ffab83dfcadf (patch) | |
| tree | 4ac0d31113256564fc0e6170ea41f08718449b7d /src | |
| parent | 0f7206776498b621de13255189cf1c659c04a701 (diff) | |
feat: The DABF partition feature changed to built in.
Diffstat (limited to 'src')
| -rw-r--r-- | src/dealpkt/duplicate_pkt_distinguish.c | 120 | ||||
| -rwxr-xr-x | src/support/dablooms/src/dablooms.c | 116 | ||||
| -rwxr-xr-x | src/support/dablooms/src/dablooms.h | 2 |
3 files changed, 112 insertions, 126 deletions
diff --git a/src/dealpkt/duplicate_pkt_distinguish.c b/src/dealpkt/duplicate_pkt_distinguish.c index fa7a64c..820642d 100644 --- a/src/dealpkt/duplicate_pkt_distinguish.c +++ b/src/dealpkt/duplicate_pkt_distinguish.c @@ -26,27 +26,20 @@ extern "C" { 2)如果开启了firewall的drop策略, 第一次drop成功�?, 但应用层会重�?, 重传包被识别成了重复包的�?, sapp就直接PASS�?, 导致CT. */ -/* 24.04, split bloomfilter into multiple partition to avoid blocking caused by memset large memory */ -struct bloom_partition -{ - void **bm_handle; - int bm_partition_num; -}; - -static inline void *_bloom_new(const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms) +void *bloom_new(const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms) { switch(dup_conf->bloom_library) { case BLOOM_LIBRARY_APBLOOM: return AP_bloom_new(now, dup_conf->bloom_error_rate, dup_conf->bloom_capacity, dup_conf->bloom_timeout_ms, dup_conf->bloom_slice_num); case BLOOM_LIBRARY_DABLOOM: - return expiry_dablooms_init(dup_conf->bloom_capacity, dup_conf->bloom_error_rate, now_ms, dup_conf->bloom_timeout_ms, dup_conf->transition_time_ms); + return expiry_dablooms_init(dup_conf->bloom_partition_num, dup_conf->bloom_capacity, dup_conf->bloom_error_rate, now_ms, dup_conf->bloom_timeout_ms, dup_conf->transition_time_ms); default: return NULL; } } -static inline void _bloom_free(void *bloom_filter, const sapp_dup_pkt_t *dup_conf) +void bloom_free(void *bloom_filter, const sapp_dup_pkt_t *dup_conf) { switch(dup_conf->bloom_library) { @@ -60,56 +53,8 @@ static inline void _bloom_free(void *bloom_filter, const sapp_dup_pkt_t *dup_con return; } -void bloom_free_partition(void *bloom_filter, const sapp_dup_pkt_t *dup_conf) -{ - struct bloom_partition *bm_partition = (struct bloom_partition *)bloom_filter; - int i; - for(i = 0; i < bm_partition->bm_partition_num; i++){ - if(bm_partition->bm_handle[i]){ - _bloom_free(bm_partition->bm_handle[i], dup_conf); - } - } - free(bm_partition); - return; -} -void *bloom_new_partition(const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms) -{ - int i; - struct bloom_partition *bm_partition = (struct bloom_partition *)calloc(1, sizeof(struct bloom_partition)); - bm_partition->bm_handle = (void **)calloc(dup_conf->bloom_partition_num, sizeof(void *)); - - long base_timeout = dup_conf->bloom_timeout_ms; - int polarity = 1; - sapp_dup_pkt_t local_dup_conf = {}; - memcpy(&local_dup_conf, dup_conf, sizeof(sapp_dup_pkt_t)); - srand(time(NULL)); - for(i = 0; i < dup_conf->bloom_partition_num; i++){ - local_dup_conf.bloom_timeout_ms = base_timeout + (random() % 1000) * polarity; // spread out expire time - polarity *= -1; - if(dup_conf->bloom_timeout_ms < 0){ - local_dup_conf.bloom_timeout_ms = base_timeout; - } - bm_partition->bm_handle[i] = _bloom_new(&local_dup_conf, now, now_ms); - if(NULL == bm_partition->bm_handle[i]){ - break; - } - } - if(i != dup_conf->bloom_partition_num){ - for(i = 0; i < dup_conf->bloom_partition_num; i++){ - if(bm_partition->bm_handle[i]){ - _bloom_free(bm_partition->bm_handle[i], dup_conf); - } - } - free(bm_partition); - return NULL; - } - bm_partition->bm_partition_num = dup_conf->bloom_partition_num; - return bm_partition; -} - - -static inline int _bloom_check(void *bloom_filter, const char *key, int key_len, const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms) +int bloom_check(void *bloom_filter, const char *key, int key_len, const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms) { switch(dup_conf->bloom_library) { @@ -124,23 +69,7 @@ static inline int _bloom_check(void *bloom_filter, const char *key, int key_len, } } -static inline unsigned int bloom_parttion_kiss_hash(const unsigned char *key, int key_len) -{ - unsigned int hash = 127; - for(int i = 0; i < key_len; i++){ - hash += (unsigned int )key[i]; - } - return (hash & 0x7ffffff); -} - -int bloom_check_partition(void *bloom_filter, const char *key, int key_len, const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms) -{ - struct bloom_partition *bm_partition = (struct bloom_partition *)bloom_filter; - int index = bloom_parttion_kiss_hash((unsigned char *)key, key_len) % bm_partition->bm_partition_num; - return _bloom_check(bm_partition->bm_handle[index], key, key_len, dup_conf, now, now_ms); -} - -static inline void _bloom_add(void *bloom_filter, const char *key, int key_len, const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms) +void bloom_add(void *bloom_filter, const char *key, int key_len, const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms) { switch(dup_conf->bloom_library) { @@ -157,18 +86,11 @@ static inline void _bloom_add(void *bloom_filter, const char *key, int key_len, return; } -void bloom_add_partition(void *bloom_filter, const char *key, int key_len, const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms) -{ - struct bloom_partition *bm_partition = (struct bloom_partition *)bloom_filter; - int index = bloom_parttion_kiss_hash((unsigned char *)key, key_len) % bm_partition->bm_partition_num; - return _bloom_add(bm_partition->bm_handle[index], key, key_len, dup_conf, now, now_ms); -} - int bloomfilter_get_mem_stat(int tseq, long long *mem_blocks, long long *mem_bytes) { - struct bloom_partition *bm_partition = (struct bloom_partition *)sapp_global_val->mthread_volatile[tseq]->dup_pkt_distinguish_handle; + void *bm_handle = (struct bloom_partition *)sapp_global_val->mthread_volatile[tseq]->dup_pkt_distinguish_handle; - if(NULL == bm_partition){ + if(NULL == bm_handle){ *mem_blocks = 0; *mem_bytes = 0; return 0; @@ -183,19 +105,9 @@ int bloomfilter_get_mem_stat(int tseq, long long *mem_blocks, long long *mem_byt return -1; break; case BLOOM_LIBRARY_DABLOOM: - { - long long tmp_blocks = 0; - long long tmp_bytes = 0; - *mem_blocks = 0; - *mem_bytes = 0; - for (int i = 0; i < bm_partition->bm_partition_num; i++) - { - expiry_dablooms_get_mem_stat(bm_partition->bm_handle[i], &tmp_blocks, &tmp_bytes); - *mem_blocks += tmp_blocks; - *mem_bytes += tmp_bytes; - } - } + expiry_dablooms_get_mem_stat((struct expiry_dablooms_handle *)bm_handle, mem_blocks, mem_bytes); break; + default: break; } @@ -219,7 +131,7 @@ int sapp_dup_stream_search(struct streaminfo *a_stream) } struct timeval now={g_CurrentTime, 0}; - return bloom_check_partition(sapp_global_val->mthread_volatile[a_stream->threadnum]->dup_pkt_distinguish_handle, key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now, g_current_time_ms); + return bloom_check(sapp_global_val->mthread_volatile[a_stream->threadnum]->dup_pkt_distinguish_handle, key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now, g_current_time_ms); } @@ -238,7 +150,7 @@ void sapp_dup_stream_add(struct streaminfo *a_stream) key_len=sizeof(struct stream_tuple4_v6); } struct timeval now={g_CurrentTime, 0}; - bloom_add_partition(sapp_global_val->mthread_volatile[a_stream->threadnum]->dup_pkt_distinguish_handle, key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now, g_current_time_ms); + bloom_add(sapp_global_val->mthread_volatile[a_stream->threadnum]->dup_pkt_distinguish_handle, key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now, g_current_time_ms); } static inline int sapp_set_dup_pkt_key(enum addr_type_t addr_type, enum stream_type_t stream_type, const void *iphdr, @@ -299,12 +211,12 @@ int sapp_dup_pkt_identify(int tid, struct streaminfo_private *pstream_pr, struct timeval now={g_CurrentTime, 0}; size_t key_len = sapp_set_dup_pkt_key((enum addr_type_t)pstream_pr->stream_public.addr.addrtype, (enum stream_type_t)pstream_pr->stream_public.type, ip_hdr, (void *)l4_hdr, &dup_bloom_key); - is_dup_pkt= bloom_check_partition(sapp_global_val->mthread_volatile[tid]->dup_pkt_distinguish_handle, (const char*)&dup_bloom_key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now, g_current_time_ms); + is_dup_pkt= bloom_check(sapp_global_val->mthread_volatile[tid]->dup_pkt_distinguish_handle, (const char*)&dup_bloom_key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now, g_current_time_ms); if(is_dup_pkt != 0){ pstream_pr->has_duplicate_pkt = 1; }else{ if(need_add_bloom_filter){ - bloom_add_partition(sapp_global_val->mthread_volatile[tid]->dup_pkt_distinguish_handle, (const char*)&dup_bloom_key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now, g_current_time_ms); + bloom_add(sapp_global_val->mthread_volatile[tid]->dup_pkt_distinguish_handle, (const char*)&dup_bloom_key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now, g_current_time_ms); } } return is_dup_pkt; @@ -317,7 +229,7 @@ void sapp_dup_pkt_mark_l4(struct streaminfo *a_stream, const void *ip_hdr, const int ret = 0; struct timeval now={g_CurrentTime, 0}; key_len = sapp_set_dup_pkt_key((enum addr_type_t)a_stream->addr.addrtype, (enum stream_type_t)a_stream->type, ip_hdr, l4_hdr, &dup_bloom_key); - bloom_add_partition(sapp_global_val->mthread_volatile[tid]->dup_pkt_distinguish_handle, (const char*)&dup_bloom_key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now, g_current_time_ms); + bloom_add(sapp_global_val->mthread_volatile[tid]->dup_pkt_distinguish_handle, (const char*)&dup_bloom_key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now, g_current_time_ms); return; } @@ -338,7 +250,7 @@ int sapp_dup_pkt_init(void) 可能好处只是准确率高了一点点而已 */ struct timeval now={time(NULL), 0}; for(i = 0; i < sapp_global_val->config.cpu.worker_threads; i++){ - sapp_global_val->mthread_volatile[i]->dup_pkt_distinguish_handle = bloom_new_partition(p_dup_conf, now, g_current_time_ms); + sapp_global_val->mthread_volatile[i]->dup_pkt_distinguish_handle = bloom_new(p_dup_conf, now, g_current_time_ms); if(NULL == sapp_global_val->mthread_volatile[i]->dup_pkt_distinguish_handle){ sapp_log(RLOG_LV_FATAL, 1, 1, "dup_pkt_distinguish init error!"); return -1; @@ -355,7 +267,7 @@ void sapp_dup_pkt_destroy(void) const sapp_dup_pkt_t *p_dup_conf = &sapp_global_val->config.packet_io.dup_pkt_para; for(i = 0; i < sapp_global_val->config.cpu.worker_threads; i++){ if(sapp_global_val->mthread_volatile[i]->dup_pkt_distinguish_handle){ - bloom_free_partition(sapp_global_val->mthread_volatile[i]->dup_pkt_distinguish_handle,p_dup_conf); + bloom_free(sapp_global_val->mthread_volatile[i]->dup_pkt_distinguish_handle,p_dup_conf); } } diff --git a/src/support/dablooms/src/dablooms.c b/src/support/dablooms/src/dablooms.c index fbdcd31..2c563da 100755 --- a/src/support/dablooms/src/dablooms.c +++ b/src/support/dablooms/src/dablooms.c @@ -17,7 +17,7 @@ #include "dablooms.h" #include "mem_util.h" -#define DABLOOMS_VERSION "1.0.1" +#define DABLOOMS_VERSION "1.0.2" #define ERROR_TIGHTENING_RATIO 0.5 #define SALT_CONSTANT 0x97c29b3a @@ -498,8 +498,7 @@ static scaling_bloom_t *new_scaling_bloom(unsigned int capacity, double error_ra return bloom; } - -struct expiry_dablooms_handle{ +struct expiry_dablooms_handle_entity{ scaling_bloom_t *cur_bloom; scaling_bloom_t *next_bloom; long cur_bloom_start_ms; @@ -515,6 +514,13 @@ struct expiry_dablooms_handle{ struct dabm_mem_stat mstat; }; +/* 24.05, split bloomfilter into multiple partition to avoid blocking caused by memset large memory */ +struct expiry_dablooms_handle{ + struct expiry_dablooms_handle_entity **bm_handle; + int bm_partition_num; +}; + + char* expiry_dablooms_errno_trans(enum expiry_dablooms_errno _errno){ switch(_errno){ case EXPIRY_DABLOOMS_ERRNO_BLOOM_NULL: @@ -526,7 +532,7 @@ char* expiry_dablooms_errno_trans(enum expiry_dablooms_errno _errno){ } } -void expiry_dablooms_destroy(struct expiry_dablooms_handle *handle){ +static void expiry_dablooms_destroy_entity(struct expiry_dablooms_handle_entity *handle){ if(handle != NULL){ if(handle->cur_bloom != NULL){ free_scaling_bloom(handle->cur_bloom, &handle->mstat); @@ -538,8 +544,16 @@ void expiry_dablooms_destroy(struct expiry_dablooms_handle *handle){ } } -struct expiry_dablooms_handle* expiry_dablooms_init(unsigned int capacity, double error_rate, long cur_time_ms, long expiry_time_ms, long transition_time_ms){ - struct expiry_dablooms_handle *handle = (struct expiry_dablooms_handle *)calloc(1, sizeof(struct expiry_dablooms_handle)); +void expiry_dablooms_destroy(struct expiry_dablooms_handle *handle){ + for (size_t i = 0; i < handle->bm_partition_num; i++) + { + expiry_dablooms_destroy_entity(handle->bm_handle[i]); + } +} + + +static struct expiry_dablooms_handle_entity* expiry_dablooms_init_entity(unsigned int capacity, double error_rate, long cur_time_ms, long expiry_time_ms, long transition_time_ms){ + struct expiry_dablooms_handle_entity *handle = (struct expiry_dablooms_handle_entity *)calloc(1, sizeof(struct expiry_dablooms_handle_entity)); scaling_bloom_t *cur_bloom = new_scaling_bloom(capacity, error_rate, &handle->mstat); if(cur_bloom == NULL){ goto error_out; @@ -555,19 +569,52 @@ struct expiry_dablooms_handle* expiry_dablooms_init(unsigned int capacity, doubl return handle; error_out: + expiry_dablooms_destroy_entity(handle); + return NULL; +} + +struct expiry_dablooms_handle* expiry_dablooms_init(int partition_num, unsigned int capacity, double error_rate, long cur_time_ms, long expiry_time_ms, long transition_time_ms) +{ + struct expiry_dablooms_handle *handle = (struct expiry_dablooms_handle *)calloc(1, sizeof(struct expiry_dablooms_handle)); + handle->bm_partition_num = partition_num; + handle->bm_handle = (struct expiry_dablooms_handle_entity **)calloc(partition_num, sizeof(struct expiry_dablooms_handle_entity *)); + + long spread_expire_timeout; + srand(time(NULL)); + int polarity = 1; + for (size_t i = 0; i < partition_num; i++) + { + spread_expire_timeout = expiry_time_ms + (random() % 1000) * polarity; // spread out expire time + if(spread_expire_timeout <= 0){ + spread_expire_timeout = expiry_time_ms; + } + polarity *= -1; + handle->bm_handle[i] = expiry_dablooms_init_entity(capacity, error_rate, cur_time_ms, spread_expire_timeout, transition_time_ms); + if(handle->bm_handle[i] == NULL){ + goto error_out; + } + } + return handle; + +error_out: expiry_dablooms_destroy(handle); return NULL; } -// int expiry_dablooms_element_count_get(struct expiry_dablooms_handle *handle, uint64_t *count){ -// if(handle == NULL || handle->cur_bloom == NULL){ -// return EXPIRY_DABLOOMS_ERRNO_BLOOM_NULL; -// } -// *count = handle->cur_bloom_inc_id; -// return 0; -// } +int expiry_dablooms_element_count_get(struct expiry_dablooms_handle *handle, uint64_t *count){ + uint64_t tot_count = 0; + if(handle == NULL || handle->bm_handle == NULL){ + return EXPIRY_DABLOOMS_ERRNO_BLOOM_NULL; + } + for (int i = 0; i < handle->bm_partition_num; i++) + { + tot_count += handle->bm_handle[i]->cur_bloom_inc_id; + } + *count = tot_count; + return 0; +} -static inline int bloom_in_transition_period(const struct expiry_dablooms_handle *handle, long cur_time_ms) +static inline int bloom_in_transition_period(const struct expiry_dablooms_handle_entity *handle, long cur_time_ms) { if((cur_time_ms - handle->cur_bloom_start_ms) + handle->transition_time_ms < handle->expiry_time_ms){ return 0; @@ -575,7 +622,7 @@ static inline int bloom_in_transition_period(const struct expiry_dablooms_handle return 1; } -static int bloom_expired_check(struct expiry_dablooms_handle *handle, long cur_time_ms, struct dabm_mem_stat *mstat){ +static int bloom_expired_check(struct expiry_dablooms_handle_entity *handle, long cur_time_ms, struct dabm_mem_stat *mstat){ // if(unlikely(handle == NULL || handle->cur_bloom == NULL)){ // return EXPIRY_DABLOOMS_ERRNO_BLOOM_NULL; // } @@ -611,14 +658,13 @@ static int bloom_expired_check(struct expiry_dablooms_handle *handle, long cur_t return 0; } -int expiry_dablooms_add(struct expiry_dablooms_handle *handle, const char *key, size_t len, long cur_time_ms){ +static int expiry_dablooms_add_entity(struct expiry_dablooms_handle_entity *handle, const char *key, size_t len, long cur_time_ms){ assert(!(key==NULL || len ==0 || handle == NULL)); int ret = bloom_expired_check(handle, cur_time_ms, &handle->mstat); if(unlikely(ret < 0)) { return ret; } - __builtin_prefetch(key, 0, 0); uint32_t checksum[4]; MurmurHash3_x64_128(key, len, SALT_CONSTANT, checksum); @@ -646,7 +692,23 @@ int expiry_dablooms_add(struct expiry_dablooms_handle *handle, const char *key, return 0; } -int expiry_dablooms_search(struct expiry_dablooms_handle *handle, const char *key, size_t len, long cur_time_ms){ +/* Keep It Simple Stupid */ +static inline unsigned int bloom_parttion_kiss_hash(const unsigned char *key, size_t key_len) +{ + unsigned int hash = 127; + for(size_t i = 0; i < key_len; i++){ + hash += (unsigned int )key[i]; + } + return (hash & 0x7ffffff); +} + +int expiry_dablooms_add(struct expiry_dablooms_handle *handle, const char *key, size_t len, long cur_time_ms){ + __builtin_prefetch(key, 0, 0); + int index = (int)bloom_parttion_kiss_hash((unsigned char *)key, len) % handle->bm_partition_num; + return expiry_dablooms_add_entity(handle->bm_handle[index], key, len, cur_time_ms); +} + +static int expiry_dablooms_search_entity(struct expiry_dablooms_handle_entity *handle, const char *key, size_t len, long cur_time_ms){ assert(!(key==NULL || len ==0 || handle == NULL)); int ret = bloom_expired_check(handle, cur_time_ms, &handle->mstat); if(unlikely(ret < 0)) @@ -654,7 +716,6 @@ int expiry_dablooms_search(struct expiry_dablooms_handle *handle, const char *ke return ret; } - __builtin_prefetch(key, 0, 0); uint32_t checksum[4]; MurmurHash3_x64_128(key, len, SALT_CONSTANT, checksum); @@ -673,9 +734,22 @@ int expiry_dablooms_search(struct expiry_dablooms_handle *handle, const char *ke return bloom_hit_cur || bloom_hit_next; } +int expiry_dablooms_search(struct expiry_dablooms_handle *handle, const char *key, size_t len, long cur_time_ms){ + __builtin_prefetch(key, 0, 0); + int index = (int)bloom_parttion_kiss_hash((unsigned char *)key, len) % handle->bm_partition_num; + return expiry_dablooms_search_entity(handle->bm_handle[index], key, len, cur_time_ms); +} + int expiry_dablooms_get_mem_stat(struct expiry_dablooms_handle *handle, long long *blocks, long long *bytes) { - *blocks = handle->mstat.blocks; - *bytes = handle->mstat.bytes; + long long tot_blocks = 0; + long long tot_bytes = 0; + for (int i = 0; i < handle->bm_partition_num; i++) + { + tot_blocks += handle->bm_handle[i]->mstat.blocks; + tot_bytes += handle->bm_handle[i]->mstat.bytes; + } + *blocks = tot_blocks; + *bytes = tot_bytes; return 0; }
\ No newline at end of file diff --git a/src/support/dablooms/src/dablooms.h b/src/support/dablooms/src/dablooms.h index f93a9a0..85886b4 100755 --- a/src/support/dablooms/src/dablooms.h +++ b/src/support/dablooms/src/dablooms.h @@ -93,7 +93,7 @@ enum expiry_dablooms_errno{ }; char* expiry_dablooms_errno_trans(enum expiry_dablooms_errno _errno); void expiry_dablooms_destroy(struct expiry_dablooms_handle *handle); -struct expiry_dablooms_handle* expiry_dablooms_init(unsigned int capacity, double error_rate, long cur_time_ms, long expiry_time_ms, long transition_time_ms); +struct expiry_dablooms_handle* expiry_dablooms_init(int partition_num, unsigned int capacity, double error_rate, long cur_time_ms, long expiry_time_ms, long transition_time_ms); int expiry_dablooms_element_count_get(struct expiry_dablooms_handle *handle, uint64_t *count); int expiry_dablooms_add(struct expiry_dablooms_handle *handle, const char *key, size_t len, time_t cur_time); int expiry_dablooms_search(struct expiry_dablooms_handle *handle, const char *key, size_t len, time_t cur_time); |
