summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorlijia <[email protected]>2024-05-11 17:56:37 +0800
committeryangwei <[email protected]>2024-05-17 14:10:21 +0800
commit0f7206776498b621de13255189cf1c659c04a701 (patch)
treef7d749e6f4c0796e164de4fea15e60e4b9b6f49a /src
parentb497224a3738a8c55768915288eafdeb97573d66 (diff)
TSG-20808,TSG-20954:split bm into multi-partition to avoid long latency by memset large memory;Both bm are queried in transition
Diffstat (limited to 'src')
-rw-r--r--src/common/sapp_mem.c40
-rw-r--r--src/config/config_parse.cpp25
-rw-r--r--src/dealpkt/duplicate_pkt_distinguish.c165
-rw-r--r--src/sapp_dev/sapp_init.c2
-rw-r--r--src/support/ap_bloom/src/ap_bloom.c8
-rw-r--r--src/support/dablooms/CMakeLists.txt2
-rwxr-xr-xsrc/support/dablooms/src/dablooms.c481
-rwxr-xr-xsrc/support/dablooms/src/dablooms.h44
-rw-r--r--src/support/dablooms/src/mem_util.c101
-rw-r--r--src/support/dablooms/src/mem_util.h22
-rw-r--r--src/support/dablooms/src/test_mem_util.c63
11 files changed, 690 insertions, 263 deletions
diff --git a/src/common/sapp_mem.c b/src/common/sapp_mem.c
index e921de0..aeb6b36 100644
--- a/src/common/sapp_mem.c
+++ b/src/common/sapp_mem.c
@@ -49,9 +49,10 @@ static const mem_stat_name_tuple_t mem_stat_name_tuple[] =
{SAPP_MEM_DYN_PADDR , "layer_addr"},
{SAPP_MEM_DYN_DETAIN_PKT , "detain_pkt"},
{SAPP_MEM_DYN_SID_LIST , "segment_id_list"},
+ {SAPP_MEM_DYN_BLOOM_FILTER , "bloom_filter"},
};
-void *sapp_mem_malloc(sapp_mem_type_t type, int thread_seq, int size)
+void *sapp_mem_malloc(sapp_mem_type_t type, int thread_seq, unsigned int size)
{
char *ptr;
sapp_private_mem_t *mhdr;
@@ -82,7 +83,7 @@ void *sapp_mem_malloc(sapp_mem_type_t type, int thread_seq, int size)
}
-void *sapp_mem_calloc(sapp_mem_type_t type, int thread_seq, int size)
+void *sapp_mem_calloc(sapp_mem_type_t type, int thread_seq, unsigned int size)
{
char *ptr;
sapp_private_mem_t *mhdr;
@@ -137,7 +138,7 @@ void sapp_mem_free(sapp_mem_type_t type, int thread_seq, void *data)
free((void *)mhdr);
}
-void *sapp_mem_realloc(sapp_mem_type_t type, int thread_seq, void *old_ptr, int size)
+void *sapp_mem_realloc(sapp_mem_type_t type, int thread_seq, void *old_ptr, unsigned int size)
{
void *new_ptr;
sapp_private_mem_t *mhdr;
@@ -258,7 +259,7 @@ void sapp_mem_stat_output(void)
}
for(stat_index = __SAPP_FIX_DYN_SEPARATOR + 1 ; stat_index < __SAPP_MEM_TYPE_MAX; stat_index++){
- if(dyn_realtime_stat.mem_used_block[stat_index] > 0){
+ if(dyn_realtime_stat.mem_used_block[stat_index] != 0){
fprintf(fp, "%7s %18s %18lld %18lld %18s\n",
"",
mem_stat_name_tuple[stat_index].mem_type_string,
@@ -287,6 +288,37 @@ void sapp_mem_stat_output(void)
fclose(fp);
}
+char bloomfilter_mem_stat_polling_entry(struct streaminfo *nouse1, void **nouse2, int thread_seq, void *nouse3)
+{
+ static time_t last_get_time[SAPP_MAX_THREADS] = {};
+
+ if(ABBR_CURRENT_TIME <= last_get_time[thread_seq])
+ {
+ return POLLING_STATE_IDLE;
+ }
+
+ long long blocks = 0, bytes = 0;
+ bloomfilter_get_mem_stat(thread_seq, &blocks, &bytes);
+
+ sapp_global_val->mthread_volatile[thread_seq]->mem_used_stat.mem_used_block[SAPP_MEM_DYN_BLOOM_FILTER] = blocks;
+ sapp_global_val->mthread_volatile[thread_seq]->mem_used_stat.mem_used_bytes[SAPP_MEM_DYN_BLOOM_FILTER] = bytes;
+
+ last_get_time[thread_seq] = ABBR_CURRENT_TIME;
+ return POLLING_STATE_WORK;
+}
+
+int sapp_mem_init(void)
+{
+ if(sapp_global_val->config.packet_io.dup_pkt_para.kickout_udp_stream_enabled
+ || sapp_global_val->config.packet_io.dup_pkt_para.dup_pkt_distinguish_ipv4_tcp
+ || sapp_global_val->config.packet_io.dup_pkt_para.dup_pkt_distinguish_ipv4_udp
+ || sapp_global_val->config.packet_io.dup_pkt_para.dup_pkt_distinguish_all_inject)
+ {
+ stream_register_fun(FUN_TYPE_POLLING, (char (*)(void))bloomfilter_mem_stat_polling_entry, 0);
+ }
+ return 0;
+}
+
#ifdef __cplusplus
}
#endif
diff --git a/src/config/config_parse.cpp b/src/config/config_parse.cpp
index 6b72ea4..a6d1277 100644
--- a/src/config/config_parse.cpp
+++ b/src/config/config_parse.cpp
@@ -1319,6 +1319,22 @@ static int config_sanity_check(void)
return -1;
}
+ if (pconfig->packet_io.dup_pkt_para.kickout_udp_stream_enabled
+ || pconfig->packet_io.dup_pkt_para.dup_pkt_distinguish_ipv4_tcp
+ || pconfig->packet_io.dup_pkt_para.dup_pkt_distinguish_ipv4_udp
+ || pconfig->packet_io.dup_pkt_para.dup_pkt_distinguish_all_inject)
+ {
+ if (pconfig->packet_io.dup_pkt_para.bloom_partition_num <= 0)
+ {
+ sapp_log(RLOG_LV_FATAL, ~0, ~0, "bloom_partition_num is:%d, must be greater than 0 !", pconfig->packet_io.dup_pkt_para.bloom_partition_num);
+ return -1;
+ }
+
+ if(pconfig->packet_io.dup_pkt_para.transition_time_ms > pconfig->packet_io.dup_pkt_para.bloom_timeout_ms/2){
+ sapp_log(RLOG_LV_INFO, ~0, ~0, "bloom_transition_time is greater than bloom_timeout/2\n");
+ }
+ }
+
/******************************* CPU ********************************/
cur_cpu_num = get_nprocs();
if(cur_cpu_num < pconfig->cpu.worker_threads){
@@ -1735,13 +1751,18 @@ int sapp_parse_config(void)
tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.traffic.inject", (char *)"inject_all_enabled", &pconfig->packet_io.dup_pkt_para.dup_pkt_distinguish_all_inject, 0);
tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.parameters", (char *)"bloom_capacity", &pconfig->packet_io.dup_pkt_para.bloom_capacity, 1000000);
- tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.parameters", (char *)"bloom_timeout", &pconfig->packet_io.dup_pkt_para.bloom_timeout, 60);
+ int tmp_dm_timeout;
+ tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.parameters", (char *)"bloom_timeout", &tmp_dm_timeout, 60 * 1000);
+ pconfig->packet_io.dup_pkt_para.bloom_timeout_ms = (long)tmp_dm_timeout * 1000;
+ tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.parameters", (char *)"bloom_transition_time", &tmp_dm_timeout, 3);
+ pconfig->packet_io.dup_pkt_para.transition_time_ms = (long)tmp_dm_timeout * 1000;
+
tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.parameters", (char *)"bloom_slice_num", &pconfig->packet_io.dup_pkt_para.bloom_slice_num, 3);
tomlc99_wrap_load_string_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.parameters", (char *)"bloom_error_rate", str_tmp, sizeof(str_tmp), "0.000001");
tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.parameters", (char *)"bloom_library", (int *)&pconfig->packet_io.dup_pkt_para.bloom_library, 1);
tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.parameters", (char *)"first_packets", (int *)&pconfig->packet_io.dup_pkt_para.first_packets, 3);
+ tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.parameters", (char *)"bloom_partition_num", (int *)&pconfig->packet_io.dup_pkt_para.bloom_partition_num, 1);
pconfig->packet_io.dup_pkt_para.bloom_error_rate = strtod(str_tmp, NULL);
-
/******************************* packet_io.under_ddos ******************************/
tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"packet_io.under_ddos", (char *)"stream_bypass_enabled", &pconfig->packet_io.under_ddos_config.enabled, 0); //��ǰ����, Ĭ�ϲ�����
tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"packet_io.under_ddos", (char *)"get_cpu_usage_interval", &pconfig->packet_io.under_ddos_config.get_cpu_usage_interval, 500);
diff --git a/src/dealpkt/duplicate_pkt_distinguish.c b/src/dealpkt/duplicate_pkt_distinguish.c
index 1a904b6..fa7a64c 100644
--- a/src/dealpkt/duplicate_pkt_distinguish.c
+++ b/src/dealpkt/duplicate_pkt_distinguish.c
@@ -12,35 +12,41 @@ extern "C" {
识别因路由策略导致的重复流量, 首次收到数据包加入bloom filter,
每个流的前N个包扫描bloom filter, 重复的数据包直接转发即可.
- ip层的ttl, hop不能算为key, 因为经过了几跳路由器又回绕了, ttl肯定会变化, 而上层的数据包确实是重复的.
+ ip层的ttl, hop不能算为key, 因为经过了几跳路由器又回绕了, ttl肯定会变�?, 而上层的数据包确实是重复�?.
*/
/*
2021-05-18 lijia close ipv6 protocol:
重复流量识别bloom filter句柄, 根据流量方向和协议的不同, 功能分为三类,
- IPv6因为没有Ipid字段, 无法区别真正的应用层重传和重复流量, 开启了会导致断网或者CT, 所以不支持!!!
+ IPv6因为没有Ipid字段, 无法区别真正的应用层重传和重复流�?, 开启了会导致断网或者CT, 所以不支持!!!
- 对于IPv6的两种情况:
- 1)如果开启了代理策略, 重传包被识别成了重复包的话, sapp就直接PASS了, 实际那个包是真实通信双方发的, 没有经过tfe处理, 可能就断网了.
- 2)如果开启了firewall的drop策略, 第一次drop成功了, 但应用层会重传, 重传包被识别成了重复包的话, sapp就直接PASS了, 导致CT.
+ 对于IPv6的两种情�?:
+ 1)如果开启了代理策略, 重传包被识别成了重复包的�?, sapp就直接PASS�?, 实际那个包是真实通信双方发的, 没有经过tfe处理, 可能就断网了.
+ 2)如果开启了firewall的drop策略, 第一次drop成功�?, 但应用层会重�?, 重传包被识别成了重复包的�?, sapp就直接PASS�?, 导致CT.
*/
+/* 24.04, split bloomfilter into multiple partition to avoid blocking caused by memset large memory */
+struct bloom_partition
+{
+ void **bm_handle;
+ int bm_partition_num;
+};
-static inline void *bloom_new(const sapp_dup_pkt_t *dup_conf, struct timeval now)
+static inline void *_bloom_new(const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms)
{
switch(dup_conf->bloom_library)
{
case BLOOM_LIBRARY_APBLOOM:
- return AP_bloom_new(now, dup_conf->bloom_error_rate, dup_conf->bloom_capacity, dup_conf->bloom_timeout*1000, dup_conf->bloom_slice_num);
+ return AP_bloom_new(now, dup_conf->bloom_error_rate, dup_conf->bloom_capacity, dup_conf->bloom_timeout_ms, dup_conf->bloom_slice_num);
case BLOOM_LIBRARY_DABLOOM:
- return expiry_dablooms_init(dup_conf->bloom_capacity, dup_conf->bloom_error_rate, now.tv_sec, dup_conf->bloom_timeout);
+ return expiry_dablooms_init(dup_conf->bloom_capacity, dup_conf->bloom_error_rate, now_ms, dup_conf->bloom_timeout_ms, dup_conf->transition_time_ms);
default:
return NULL;
}
}
-static inline void bloom_free(void *bloom_filter, const sapp_dup_pkt_t *dup_conf)
+static inline void _bloom_free(void *bloom_filter, const sapp_dup_pkt_t *dup_conf)
{
switch(dup_conf->bloom_library)
{
@@ -54,7 +60,56 @@ static inline void bloom_free(void *bloom_filter, const sapp_dup_pkt_t *dup_conf
return;
}
-static inline int bloom_check(void *bloom_filter, const char *key, int key_len, const sapp_dup_pkt_t *dup_conf, struct timeval now)
+void bloom_free_partition(void *bloom_filter, const sapp_dup_pkt_t *dup_conf)
+{
+ struct bloom_partition *bm_partition = (struct bloom_partition *)bloom_filter;
+ int i;
+ for(i = 0; i < bm_partition->bm_partition_num; i++){
+ if(bm_partition->bm_handle[i]){
+ _bloom_free(bm_partition->bm_handle[i], dup_conf);
+ }
+ }
+ free(bm_partition);
+ return;
+}
+
+void *bloom_new_partition(const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms)
+{
+ int i;
+ struct bloom_partition *bm_partition = (struct bloom_partition *)calloc(1, sizeof(struct bloom_partition));
+ bm_partition->bm_handle = (void **)calloc(dup_conf->bloom_partition_num, sizeof(void *));
+
+ long base_timeout = dup_conf->bloom_timeout_ms;
+ int polarity = 1;
+ sapp_dup_pkt_t local_dup_conf = {};
+ memcpy(&local_dup_conf, dup_conf, sizeof(sapp_dup_pkt_t));
+ srand(time(NULL));
+ for(i = 0; i < dup_conf->bloom_partition_num; i++){
+ local_dup_conf.bloom_timeout_ms = base_timeout + (random() % 1000) * polarity; // spread out expire time
+ polarity *= -1;
+ if(dup_conf->bloom_timeout_ms < 0){
+ local_dup_conf.bloom_timeout_ms = base_timeout;
+ }
+ bm_partition->bm_handle[i] = _bloom_new(&local_dup_conf, now, now_ms);
+ if(NULL == bm_partition->bm_handle[i]){
+ break;
+ }
+ }
+ if(i != dup_conf->bloom_partition_num){
+ for(i = 0; i < dup_conf->bloom_partition_num; i++){
+ if(bm_partition->bm_handle[i]){
+ _bloom_free(bm_partition->bm_handle[i], dup_conf);
+ }
+ }
+ free(bm_partition);
+ return NULL;
+ }
+ bm_partition->bm_partition_num = dup_conf->bloom_partition_num;
+ return bm_partition;
+}
+
+
+static inline int _bloom_check(void *bloom_filter, const char *key, int key_len, const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms)
{
switch(dup_conf->bloom_library)
{
@@ -63,13 +118,29 @@ static inline int bloom_check(void *bloom_filter, const char *key, int key_len,
now, (const char *)key, key_len);
case BLOOM_LIBRARY_DABLOOM:
return expiry_dablooms_search((struct expiry_dablooms_handle *)bloom_filter,
- (const char *)key, key_len, (time_t)now.tv_sec);
+ (const char *)key, key_len, now_ms);
default:
return 0;
}
}
-static inline void bloom_add(void *bloom_filter, const char *key, int key_len, const sapp_dup_pkt_t *dup_conf, struct timeval now)
+static inline unsigned int bloom_parttion_kiss_hash(const unsigned char *key, int key_len)
+{
+ unsigned int hash = 127;
+ for(int i = 0; i < key_len; i++){
+ hash += (unsigned int )key[i];
+ }
+ return (hash & 0x7ffffff);
+}
+
+int bloom_check_partition(void *bloom_filter, const char *key, int key_len, const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms)
+{
+ struct bloom_partition *bm_partition = (struct bloom_partition *)bloom_filter;
+ int index = bloom_parttion_kiss_hash((unsigned char *)key, key_len) % bm_partition->bm_partition_num;
+ return _bloom_check(bm_partition->bm_handle[index], key, key_len, dup_conf, now, now_ms);
+}
+
+static inline void _bloom_add(void *bloom_filter, const char *key, int key_len, const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms)
{
switch(dup_conf->bloom_library)
{
@@ -78,7 +149,7 @@ static inline void bloom_add(void *bloom_filter, const char *key, int key_len, c
now, (const char *)key, key_len);
case BLOOM_LIBRARY_DABLOOM:
expiry_dablooms_add((struct expiry_dablooms_handle *)bloom_filter,
- (const char *)key, key_len, (time_t)now.tv_sec);
+ (const char *)key, key_len, now_ms);
break;
default:
break;
@@ -86,6 +157,52 @@ static inline void bloom_add(void *bloom_filter, const char *key, int key_len, c
return;
}
+void bloom_add_partition(void *bloom_filter, const char *key, int key_len, const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms)
+{
+ struct bloom_partition *bm_partition = (struct bloom_partition *)bloom_filter;
+ int index = bloom_parttion_kiss_hash((unsigned char *)key, key_len) % bm_partition->bm_partition_num;
+ return _bloom_add(bm_partition->bm_handle[index], key, key_len, dup_conf, now, now_ms);
+}
+
+int bloomfilter_get_mem_stat(int tseq, long long *mem_blocks, long long *mem_bytes)
+{
+ struct bloom_partition *bm_partition = (struct bloom_partition *)sapp_global_val->mthread_volatile[tseq]->dup_pkt_distinguish_handle;
+
+ if(NULL == bm_partition){
+ *mem_blocks = 0;
+ *mem_bytes = 0;
+ return 0;
+ }
+
+ switch (sapp_global_val->config.packet_io.dup_pkt_para.bloom_library)
+ {
+ case BLOOM_LIBRARY_APBLOOM:
+ // todo,
+ *mem_blocks = 0;
+ *mem_bytes = 0;
+ return -1;
+ break;
+ case BLOOM_LIBRARY_DABLOOM:
+ {
+ long long tmp_blocks = 0;
+ long long tmp_bytes = 0;
+ *mem_blocks = 0;
+ *mem_bytes = 0;
+ for (int i = 0; i < bm_partition->bm_partition_num; i++)
+ {
+ expiry_dablooms_get_mem_stat(bm_partition->bm_handle[i], &tmp_blocks, &tmp_bytes);
+ *mem_blocks += tmp_blocks;
+ *mem_bytes += tmp_bytes;
+ }
+ }
+ break;
+ default:
+ break;
+ }
+
+ return 0;
+}
+
int sapp_dup_stream_search(struct streaminfo *a_stream)
{
void *key=NULL;
@@ -102,7 +219,7 @@ int sapp_dup_stream_search(struct streaminfo *a_stream)
}
struct timeval now={g_CurrentTime, 0};
- return bloom_check(sapp_global_val->mthread_volatile[a_stream->threadnum]->dup_pkt_distinguish_handle, key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now);
+ return bloom_check_partition(sapp_global_val->mthread_volatile[a_stream->threadnum]->dup_pkt_distinguish_handle, key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now, g_current_time_ms);
}
@@ -121,7 +238,7 @@ void sapp_dup_stream_add(struct streaminfo *a_stream)
key_len=sizeof(struct stream_tuple4_v6);
}
struct timeval now={g_CurrentTime, 0};
- bloom_add(sapp_global_val->mthread_volatile[a_stream->threadnum]->dup_pkt_distinguish_handle, key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now);
+ bloom_add_partition(sapp_global_val->mthread_volatile[a_stream->threadnum]->dup_pkt_distinguish_handle, key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now, g_current_time_ms);
}
static inline int sapp_set_dup_pkt_key(enum addr_type_t addr_type, enum stream_type_t stream_type, const void *iphdr,
@@ -141,8 +258,8 @@ static inline int sapp_set_dup_pkt_key(enum addr_type_t addr_type, enum stream_t
if (stream_type == STREAM_TYPE_UDP)
{
const struct mesa_udp_hdr *uhdr = (const struct mesa_udp_hdr *)l4_hdr;
- l4->tcp_seq = 0x5A5A5A5A; /* udp没有, 但设成0可能对某些hash函数不友好, 此处设个固定值 */
- l4->tcp_ack = 0xA5A5A5A5; /* udp没有, 但设成0可能对某些hash函数不友好, 此处设个固定值 */
+ l4->tcp_seq = 0x5A5A5A5A; /* udp没有, 但设�?0可能对某些hash函数不友�?, 此处设个固定�? */
+ l4->tcp_ack = 0xA5A5A5A5; /* udp没有, 但设�?0可能对某些hash函数不友�?, 此处设个固定�? */
l4->sport = uhdr->uh_sport;
l4->dport = uhdr->uh_dport;
l4->checksum = uhdr->uh_sum;
@@ -182,12 +299,12 @@ int sapp_dup_pkt_identify(int tid, struct streaminfo_private *pstream_pr,
struct timeval now={g_CurrentTime, 0};
size_t key_len = sapp_set_dup_pkt_key((enum addr_type_t)pstream_pr->stream_public.addr.addrtype, (enum stream_type_t)pstream_pr->stream_public.type, ip_hdr, (void *)l4_hdr, &dup_bloom_key);
- is_dup_pkt= bloom_check(sapp_global_val->mthread_volatile[tid]->dup_pkt_distinguish_handle, (const char*)&dup_bloom_key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now);
+ is_dup_pkt= bloom_check_partition(sapp_global_val->mthread_volatile[tid]->dup_pkt_distinguish_handle, (const char*)&dup_bloom_key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now, g_current_time_ms);
if(is_dup_pkt != 0){
pstream_pr->has_duplicate_pkt = 1;
}else{
if(need_add_bloom_filter){
- bloom_add(sapp_global_val->mthread_volatile[tid]->dup_pkt_distinguish_handle, (const char*)&dup_bloom_key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now);
+ bloom_add_partition(sapp_global_val->mthread_volatile[tid]->dup_pkt_distinguish_handle, (const char*)&dup_bloom_key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now, g_current_time_ms);
}
}
return is_dup_pkt;
@@ -200,7 +317,7 @@ void sapp_dup_pkt_mark_l4(struct streaminfo *a_stream, const void *ip_hdr, const
int ret = 0;
struct timeval now={g_CurrentTime, 0};
key_len = sapp_set_dup_pkt_key((enum addr_type_t)a_stream->addr.addrtype, (enum stream_type_t)a_stream->type, ip_hdr, l4_hdr, &dup_bloom_key);
- bloom_add(sapp_global_val->mthread_volatile[tid]->dup_pkt_distinguish_handle, (const char*)&dup_bloom_key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now);
+ bloom_add_partition(sapp_global_val->mthread_volatile[tid]->dup_pkt_distinguish_handle, (const char*)&dup_bloom_key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now, g_current_time_ms);
return;
}
@@ -216,12 +333,12 @@ int sapp_dup_pkt_init(void)
return 0;
}
- /* 流量入口有三个开关, 识别句柄只有一个,
- 拆分句柄多耗费内存, 且判断时每个包需要扫描多个句柄,
+ /* 流量入口有三个开�?, 识别句柄只有一�?,
+ 拆分句柄多耗费内存, 且判断时每个包需要扫描多个句�?,
可能好处只是准确率高了一点点而已 */
struct timeval now={time(NULL), 0};
for(i = 0; i < sapp_global_val->config.cpu.worker_threads; i++){
- sapp_global_val->mthread_volatile[i]->dup_pkt_distinguish_handle = bloom_new(p_dup_conf, now);
+ sapp_global_val->mthread_volatile[i]->dup_pkt_distinguish_handle = bloom_new_partition(p_dup_conf, now, g_current_time_ms);
if(NULL == sapp_global_val->mthread_volatile[i]->dup_pkt_distinguish_handle){
sapp_log(RLOG_LV_FATAL, 1, 1, "dup_pkt_distinguish init error!");
return -1;
@@ -238,7 +355,7 @@ void sapp_dup_pkt_destroy(void)
const sapp_dup_pkt_t *p_dup_conf = &sapp_global_val->config.packet_io.dup_pkt_para;
for(i = 0; i < sapp_global_val->config.cpu.worker_threads; i++){
if(sapp_global_val->mthread_volatile[i]->dup_pkt_distinguish_handle){
- bloom_free(sapp_global_val->mthread_volatile[i]->dup_pkt_distinguish_handle,p_dup_conf);
+ bloom_free_partition(sapp_global_val->mthread_volatile[i]->dup_pkt_distinguish_handle,p_dup_conf);
}
}
diff --git a/src/sapp_dev/sapp_init.c b/src/sapp_dev/sapp_init.c
index f6ef015..5c1d247 100644
--- a/src/sapp_dev/sapp_init.c
+++ b/src/sapp_dev/sapp_init.c
@@ -303,6 +303,8 @@ int MESA_platform_init(int argc, char *argv[])
sapp_set_current_state(SAPP_STATE_PKT_IO_INITED);
+ sapp_mem_init();
+
sapp_runtime_log(RLOG_LV_DEBUG, "sapp platform init success");
return 0;
diff --git a/src/support/ap_bloom/src/ap_bloom.c b/src/support/ap_bloom/src/ap_bloom.c
index 79cf47d..4064e3e 100644
--- a/src/support/ap_bloom/src/ap_bloom.c
+++ b/src/support/ap_bloom/src/ap_bloom.c
@@ -158,12 +158,14 @@ struct ap_state
int consecutive_matches;
int visited_slices;
int hash_num;
+ int counter;
UT_array slice_time_events;
};
void ap_state_init(struct ap_state *state, int hash_num)
{
state->consecutive_matches=0;
state->visited_slices=0;
+ state->counter=0;
state->hash_num=hash_num;
utarray_init(&state->slice_time_events, &slice_event_icd);
utarray_reserve(&state->slice_time_events, hash_num*2);
@@ -172,6 +174,7 @@ void ap_state_clear(struct ap_state *state)
{
state->consecutive_matches=0;
state->visited_slices=0;
+ state->counter=0;
utarray_clear(&state->slice_time_events);
}
void ap_state_done(struct ap_state *state)
@@ -188,6 +191,7 @@ int ap_state_is_match(struct ap_state *state)
{
return 1;
}
+ if(state->counter >= state->hash_num)return 1;
utarray_sort(&state->slice_time_events, slice_event_cmp);
while((ev=(struct ap_slice_event*)utarray_next(&state->slice_time_events, ev)))
{
@@ -221,6 +225,10 @@ static void ap_slice_chain_check_hash(const struct ap_slice *head, struct double
ev.hash_index = slice->hash_index;
utarray_push_back(&state->slice_time_events, &ev);
found=1;
+ if(timercmp(&slice->last_insert, &slice->first_insert, >))
+ {
+ state->counter++;
+ }
}
state->visited_slices++;
}
diff --git a/src/support/dablooms/CMakeLists.txt b/src/support/dablooms/CMakeLists.txt
index f7e8ecf..58369a1 100644
--- a/src/support/dablooms/CMakeLists.txt
+++ b/src/support/dablooms/CMakeLists.txt
@@ -1,4 +1,4 @@
cmake_minimum_required(VERSION 2.8...3.10)
add_definitions(-fPIC)
-add_library(libdabloom STATIC src/dablooms.c src/murmur.c) \ No newline at end of file
+add_library(libdabloom STATIC src/dablooms.c src/murmur.c src/mem_util.c) \ No newline at end of file
diff --git a/src/support/dablooms/src/dablooms.c b/src/support/dablooms/src/dablooms.c
index 9f52a26..fbdcd31 100755
--- a/src/support/dablooms/src/dablooms.c
+++ b/src/support/dablooms/src/dablooms.c
@@ -12,36 +12,39 @@
#include <unistd.h>
#include <errno.h>
#include <time.h>
+#include <assert.h>
#include "murmur.h"
#include "dablooms.h"
+#include "mem_util.h"
-#define DABLOOMS_VERSION "0.9.1"
+#define DABLOOMS_VERSION "1.0.1"
#define ERROR_TIGHTENING_RATIO 0.5
#define SALT_CONSTANT 0x97c29b3a
-#define ALLOC(type, number) ((type *)calloc(sizeof(type), number))
-#define FREE(p) {free(p);p=NULL;}
+#define ALLOC(type, number, mstat) ((type *)dabm_calloc(number, sizeof(type), mstat))
+#define FREE(p, mstat) dabm_free(p, mstat)
+#define REALLOC(p, size, mstat) dabm_realloc(p, size, mstat)
const char *dablooms_version(void)
{
return DABLOOMS_VERSION;
}
-void free_bitmap(bitmap_t *bitmap)
+static void free_bitmap(bitmap_t *bitmap, struct dabm_mem_stat *mstat)
{
#if 0
if ((munmap(bitmap->array, bitmap->bytes)) < 0) {
perror("Error, unmapping memory");
}
#else
- FREE(bitmap->array);
+ FREE(bitmap->array, mstat);
#endif
- FREE(bitmap);
+ FREE(bitmap, mstat);
}
-bitmap_t *bitmap_resize(bitmap_t *bitmap, size_t old_size, size_t new_size)
+static bitmap_t *bitmap_resize(bitmap_t *bitmap, size_t old_size, size_t new_size, struct dabm_mem_stat *mstat)
{
#if 0
@@ -74,18 +77,20 @@ bitmap_t *bitmap_resize(bitmap_t *bitmap, size_t old_size, size_t new_size)
#else
if (bitmap->array != NULL)
{
- bitmap->array = (char *)realloc(bitmap->array, new_size);
- if (bitmap->array == NULL)
- {
- perror("Error resizing memory");
- free_bitmap(bitmap);
- return NULL;
- }
+ bitmap->array = (char *)REALLOC(bitmap->array, new_size, mstat);
+ assert(bitmap->array);
+ // if (bitmap->array == NULL)
+ // {
+ // perror("Error resizing memory");
+ // free_bitmap(bitmap, mstat);
+ // return NULL;
+ // }
memset(bitmap->array + old_size, 0, new_size - old_size);
- }
+ //printf("bitmap_resize(), %p, new:%d, old:%d\n", bitmap,(int)new_size, (int)old_size);
+ }
else
{
- bitmap->array = ALLOC(char, new_size);
+ bitmap->array = ALLOC(char, new_size, mstat);
}
#endif
bitmap->bytes = new_size;
@@ -94,22 +99,22 @@ bitmap_t *bitmap_resize(bitmap_t *bitmap, size_t old_size, size_t new_size)
/* Create a new bitmap, not full featured, simple to give
* us a means of interacting with the 4 bit counters */
-bitmap_t *new_bitmap(size_t bytes)
+static bitmap_t *new_bitmap(size_t bytes, struct dabm_mem_stat *mstat)
{
bitmap_t *bitmap;
- bitmap = ALLOC(bitmap_t, 1);
+ bitmap = ALLOC(bitmap_t, 1, mstat);
bitmap->bytes = bytes;
- if ((bitmap = bitmap_resize(bitmap, 0, bytes)) == NULL) {
+ if ((bitmap = bitmap_resize(bitmap, 0, bytes, mstat)) == NULL) {
return NULL;
}
return bitmap;
}
-int bitmap_increment(bitmap_t *bitmap, unsigned int index, long offset)
+static inline int bitmap_increment(bitmap_t *bitmap, unsigned int index, long offset)
{
long access = index / 2 + offset;
uint8_t temp;
@@ -158,7 +163,7 @@ int bitmap_decrement(bitmap_t *bitmap, unsigned int index, long offset)
}
/* decrements the four bit counter */
-int bitmap_check(bitmap_t *bitmap, unsigned int index, long offset)
+static inline int bitmap_check(const bitmap_t *bitmap, unsigned int index, long offset)
{
long access = index / 2 + offset;
if (index % 2 != 0 ) {
@@ -168,19 +173,19 @@ int bitmap_check(bitmap_t *bitmap, unsigned int index, long offset)
}
}
-int bitmap_flush(bitmap_t *bitmap)
-{
-#if 0
- if ((msync(bitmap->array, bitmap->bytes, MS_SYNC) < 0)) {
- perror("Error, flushing bitmap to disk");
- return -1;
- } else {
- return 0;
- }
-#else
- return 0;
-#endif
-}
+// int bitmap_flush(bitmap_t *bitmap)
+// {
+// #if 0
+// if ((msync(bitmap->array, bitmap->bytes, MS_SYNC) < 0)) {
+// perror("Error, flushing bitmap to disk");
+// return -1;
+// } else {
+// return 0;
+// }
+// #else
+// return 0;
+// #endif
+// }
/*
* Perform the actual hashing for `key`
@@ -191,36 +196,34 @@ int bitmap_flush(bitmap_t *bitmap)
* See paper by Kirsch, Mitzenmacher [2006]
* http://www.eecs.harvard.edu/~michaelm/postscripts/rsa2008.pdf
*/
-void hash_func(counting_bloom_t *bloom, const char *key, size_t key_len, uint32_t *hashes)
+static inline void hash_func(uint32_t *hashes, const uint32_t checksum[4], size_t nfuncs, uint32_t counts_per_func)
{
- uint32_t checksum[4];
-
- MurmurHash3_x64_128(key, key_len, SALT_CONSTANT, checksum);
+ // uint32_t checksum[4];
+ // MurmurHash3_x64_128(key, key_len, SALT_CONSTANT, checksum);
uint32_t h1 = checksum[0];
uint32_t h2 = checksum[1];
-
- for (size_t i = 0; i < bloom->nfuncs; i++) {
- hashes[i] = (h1 + i * h2) % bloom->counts_per_func;
+ for (size_t i = 0; i < nfuncs; i++) {
+ hashes[i] = (h1 + i * h2) % counts_per_func;
}
}
-int free_counting_bloom(counting_bloom_t *bloom)
-{
- if (bloom != NULL) {
- FREE(bloom->hashes);
- bloom->hashes = NULL;
- free_bitmap(bloom->bitmap);
- FREE(bloom);
- bloom = NULL;
- }
- return 0;
-}
+// int free_counting_bloom(counting_bloom_t *bloom)
+// {
+// if (bloom != NULL) {
+// FREE(bloom->hashes);
+// bloom->hashes = NULL;
+// free_bitmap(bloom->bitmap);
+// FREE(bloom);
+// bloom = NULL;
+// }
+// return 0;
+// }
-counting_bloom_t *counting_bloom_init(unsigned int capacity, double error_rate, long offset)
+counting_bloom_t *counting_bloom_init(unsigned int capacity, double error_rate, long offset, struct dabm_mem_stat *mstat)
{
counting_bloom_t *bloom;
- bloom = ALLOC(counting_bloom_t, 1);
+ bloom = ALLOC(counting_bloom_t, 1, mstat);
bloom->bitmap = NULL;
bloom->capacity = capacity;
bloom->error_rate = error_rate;
@@ -230,30 +233,32 @@ counting_bloom_t *counting_bloom_init(unsigned int capacity, double error_rate,
bloom->size = bloom->nfuncs * bloom->counts_per_func;
/* rounding-up integer divide by 2 of bloom->size */
bloom->num_bytes = ((bloom->size + 1) / 2) + sizeof(counting_bloom_header_t);
- bloom->hashes = ALLOC(uint32_t, bloom->nfuncs);
+ bloom->hashes = ALLOC(uint32_t, bloom->nfuncs, mstat);
return bloom;
}
-counting_bloom_t *new_counting_bloom(unsigned int capacity, double error_rate)
+static counting_bloom_t *new_counting_bloom(unsigned int capacity, double error_rate, struct dabm_mem_stat *mstat)
{
counting_bloom_t *cur_bloom;
- cur_bloom = counting_bloom_init(capacity, error_rate, 0);
- cur_bloom->bitmap = new_bitmap(cur_bloom->num_bytes);
+ cur_bloom = counting_bloom_init(capacity, error_rate, 0, mstat);
+ cur_bloom->bitmap = new_bitmap(cur_bloom->num_bytes, mstat);
cur_bloom->header = (counting_bloom_header_t *)(cur_bloom->bitmap->array);
return cur_bloom;
}
-int counting_bloom_add(counting_bloom_t *bloom, const char *s, size_t len)
+static int counting_bloom_add(counting_bloom_t *bloom, const uint32_t checksum[4])
{
unsigned int index, i, offset;
unsigned int *hashes = bloom->hashes;
+ size_t nfuncs = bloom->nfuncs;
+ uint32_t counts_per_func = bloom->counts_per_func;
+
+ hash_func(hashes, checksum, nfuncs, counts_per_func);
- hash_func(bloom, s, len, hashes);
-
- for (i = 0; i < bloom->nfuncs; i++) {
- offset = i * bloom->counts_per_func;
+ for (i = 0; i < nfuncs; i++) {
+ offset = i * counts_per_func;
index = hashes[i] + offset;
bitmap_increment(bloom->bitmap, index, bloom->offset);
}
@@ -262,32 +267,34 @@ int counting_bloom_add(counting_bloom_t *bloom, const char *s, size_t len)
return 0;
}
-int counting_bloom_remove(counting_bloom_t *bloom, const char *s, size_t len)
-{
- unsigned int index, i, offset;
- unsigned int *hashes = bloom->hashes;
+// int counting_bloom_remove(counting_bloom_t *bloom, const char *s, size_t len)
+// {
+// unsigned int index, i, offset;
+// unsigned int *hashes = bloom->hashes;
- hash_func(bloom, s, len, hashes);
+// hash_func(bloom, s, len, hashes);
- for (i = 0; i < bloom->nfuncs; i++) {
- offset = i * bloom->counts_per_func;
- index = hashes[i] + offset;
- bitmap_decrement(bloom->bitmap, index, bloom->offset);
- }
- bloom->header->count--;
+// for (i = 0; i < bloom->nfuncs; i++) {
+// offset = i * bloom->counts_per_func;
+// index = hashes[i] + offset;
+// bitmap_decrement(bloom->bitmap, index, bloom->offset);
+// }
+// bloom->header->count--;
- return 0;
-}
+// return 0;
+// }
-int counting_bloom_check(counting_bloom_t *bloom, const char *s, size_t len)
+static int counting_bloom_check(const counting_bloom_t *bloom, const uint32_t checksum[4])
{
unsigned int index, i, offset;
unsigned int *hashes = bloom->hashes;
+ size_t nfuncs = bloom->nfuncs;
+ uint32_t counts_per_func = bloom->counts_per_func;
+
+ hash_func(hashes, checksum, nfuncs, counts_per_func);
- hash_func(bloom, s, len, hashes);
-
- for (i = 0; i < bloom->nfuncs; i++) {
- offset = i * bloom->counts_per_func;
+ for (i = 0; i < nfuncs; i++) {
+ offset = i * counts_per_func;
index = hashes[i] + offset;
if (!(bitmap_check(bloom->bitmap, index, bloom->offset))) {
return 0;
@@ -296,39 +303,40 @@ int counting_bloom_check(counting_bloom_t *bloom, const char *s, size_t len)
return 1;
}
-int free_scaling_bloom(scaling_bloom_t *bloom)
+static int free_scaling_bloom(scaling_bloom_t *bloom, struct dabm_mem_stat *mstat)
{
int i;
for (i = bloom->num_blooms - 1; i >= 0; i--) {
- FREE(bloom->blooms[i]->hashes);
+ FREE(bloom->blooms[i]->hashes, mstat);
bloom->blooms[i]->hashes = NULL;
- FREE(bloom->blooms[i]);
+ FREE(bloom->blooms[i], mstat);
bloom->blooms[i] = NULL;
}
- FREE(bloom->blooms);
- free_bitmap(bloom->bitmap);
- FREE(bloom);
+ FREE(bloom->blooms, mstat);
+ free_bitmap(bloom->bitmap, mstat);
+ FREE(bloom, mstat);
return 0;
}
/* creates a new counting bloom filter from a given scaling bloom filter, with count and id */
-counting_bloom_t *new_counting_bloom_from_scale(scaling_bloom_t *bloom)
+counting_bloom_t *new_counting_bloom_from_scale(scaling_bloom_t *bloom, struct dabm_mem_stat *mstat)
{
long offset;
double error_rate;
counting_bloom_t *cur_bloom;
error_rate = bloom->error_rate * (pow(ERROR_TIGHTENING_RATIO, bloom->num_blooms + 1));
-
- if ((bloom->blooms = (counting_bloom_t **)realloc(bloom->blooms, (bloom->num_blooms + 1) * sizeof(counting_bloom_t *))) == NULL) {
- fprintf(stderr, "Error, could not realloc a new bloom filter\n");
- return NULL;
- }
-
- cur_bloom = counting_bloom_init(bloom->capacity, error_rate, bloom->num_bytes);
+ bloom->blooms = (counting_bloom_t **)REALLOC(bloom->blooms, (bloom->num_blooms + 1) * sizeof(counting_bloom_t *), mstat);
+ assert(bloom->blooms);
+ // if (unlikely(bloom->blooms == NULL)) {
+ // fprintf(stderr, "Error, could not realloc a new bloom filter\n");
+ // return NULL;
+ // }
+
+ cur_bloom = counting_bloom_init(bloom->capacity, error_rate, bloom->num_bytes ,mstat);
bloom->blooms[bloom->num_blooms] = cur_bloom;
- bloom->bitmap = bitmap_resize(bloom->bitmap, bloom->num_bytes, bloom->num_bytes + cur_bloom->num_bytes);
+ bloom->bitmap = bitmap_resize(bloom->bitmap, bloom->num_bytes, bloom->num_bytes + cur_bloom->num_bytes, mstat);
/* reset header pointer, as mmap may have moved */
bloom->header = (scaling_bloom_header_t *) bloom->bitmap->array;
@@ -351,17 +359,18 @@ uint64_t scaling_bloom_clear_seqnums(scaling_bloom_t *bloom)
{
uint64_t seqnum;
- if (bloom->header->disk_seqnum != 0) {
- // disk_seqnum cleared on disk before any other changes
- bloom->header->disk_seqnum = 0;
- bitmap_flush(bloom->bitmap);
- }
+ // if (bloom->header->disk_seqnum != 0) {
+ // // disk_seqnum cleared on disk before any other changes
+ // bloom->header->disk_seqnum = 0;
+ // // bitmap_flush(bloom->bitmap);
+ // }
+ bloom->header->disk_seqnum = 0;
seqnum = bloom->header->mem_seqnum;
bloom->header->mem_seqnum = 0;
return seqnum;
}
-int scaling_bloom_add(scaling_bloom_t *bloom, const char *s, size_t len, uint64_t id)
+static int scaling_bloom_add(scaling_bloom_t *bloom, const uint32_t checksum[4], uint64_t id, struct dabm_mem_stat *mstat)
{
int i;
uint64_t seqnum;
@@ -377,84 +386,84 @@ int scaling_bloom_add(scaling_bloom_t *bloom, const char *s, size_t len, uint64_
seqnum = scaling_bloom_clear_seqnums(bloom);
if ((id > bloom->header->max_id) && (cur_bloom->header->count >= cur_bloom->capacity - 1)) {
- cur_bloom = new_counting_bloom_from_scale(bloom);
+ cur_bloom = new_counting_bloom_from_scale(bloom, mstat);
cur_bloom->header->count = 0;
cur_bloom->header->id = bloom->header->max_id + 1;
}
if (bloom->header->max_id < id) {
bloom->header->max_id = id;
}
- counting_bloom_add(cur_bloom, s, len);
+ counting_bloom_add(cur_bloom, checksum);
bloom->header->mem_seqnum = seqnum + 1;
return 1;
}
-int scaling_bloom_remove(scaling_bloom_t *bloom, const char *s, size_t len, uint64_t id)
-{
- counting_bloom_t *cur_bloom;
- int i;
- uint64_t seqnum;
+// int scaling_bloom_remove(scaling_bloom_t *bloom, const char *s, size_t len, uint64_t id)
+// {
+// counting_bloom_t *cur_bloom;
+// int i;
+// uint64_t seqnum;
- for (i = bloom->num_blooms - 1; i >= 0; i--) {
- cur_bloom = bloom->blooms[i];
- if (id >= cur_bloom->header->id) {
- seqnum = scaling_bloom_clear_seqnums(bloom);
+// for (i = bloom->num_blooms - 1; i >= 0; i--) {
+// cur_bloom = bloom->blooms[i];
+// if (id >= cur_bloom->header->id) {
+// seqnum = scaling_bloom_clear_seqnums(bloom);
- counting_bloom_remove(cur_bloom, s, len);
+// counting_bloom_remove(cur_bloom, s, len);
- bloom->header->mem_seqnum = seqnum + 1;
- return 1;
- }
- }
- return 0;
-}
-
-int scaling_bloom_check(scaling_bloom_t *bloom, const char *s, size_t len)
+// bloom->header->mem_seqnum = seqnum + 1;
+// return 1;
+// }
+// }
+// return 0;
+// }
+
+static int scaling_bloom_check(scaling_bloom_t *bloom, const uint32_t checksum[4])
{
int i;
counting_bloom_t *cur_bloom;
for (i = bloom->num_blooms - 1; i >= 0; i--) {
cur_bloom = bloom->blooms[i];
- if (counting_bloom_check(cur_bloom, s, len)) {
+ if (counting_bloom_check(cur_bloom, checksum)) {
return 1;
}
}
return 0;
}
-int scaling_bloom_flush(scaling_bloom_t *bloom)
-{
- if (bitmap_flush(bloom->bitmap) != 0) {
- return -1;
- }
- // all changes written to disk before disk_seqnum set
- if (bloom->header->disk_seqnum == 0) {
- bloom->header->disk_seqnum = bloom->header->mem_seqnum;
- return bitmap_flush(bloom->bitmap);
- }
- return 0;
-}
-
-uint64_t scaling_bloom_mem_seqnum(scaling_bloom_t *bloom)
-{
- return bloom->header->mem_seqnum;
-}
-
-uint64_t scaling_bloom_disk_seqnum(scaling_bloom_t *bloom)
-{
- return bloom->header->disk_seqnum;
-}
-
-scaling_bloom_t *scaling_bloom_init(unsigned int capacity, double error_rate)
+// int scaling_bloom_flush(scaling_bloom_t *bloom)
+// {
+// if (bitmap_flush(bloom->bitmap) != 0) {
+// return -1;
+// }
+// // all changes written to disk before disk_seqnum set
+// if (bloom->header->disk_seqnum == 0) {
+// bloom->header->disk_seqnum = bloom->header->mem_seqnum;
+// return bitmap_flush(bloom->bitmap);
+// }
+// return 0;
+// }
+
+// uint64_t scaling_bloom_mem_seqnum(scaling_bloom_t *bloom)
+// {
+// return bloom->header->mem_seqnum;
+// }
+
+// uint64_t scaling_bloom_disk_seqnum(scaling_bloom_t *bloom)
+// {
+// return bloom->header->disk_seqnum;
+// }
+
+scaling_bloom_t *scaling_bloom_init(unsigned int capacity, double error_rate,struct dabm_mem_stat *mstat)
{
scaling_bloom_t *bloom;
- bloom = ALLOC(scaling_bloom_t, 1);
- if ((bloom->bitmap = new_bitmap(sizeof(scaling_bloom_header_t))) == NULL) {
+ bloom = ALLOC(scaling_bloom_t, 1, mstat);
+ if ((bloom->bitmap = new_bitmap(sizeof(scaling_bloom_header_t), mstat)) == NULL) {
fprintf(stderr, "Error, Could not create bitmap with file\n");
- free_scaling_bloom(bloom);
+ free_scaling_bloom(bloom, mstat);
return NULL;
}
@@ -468,19 +477,20 @@ scaling_bloom_t *scaling_bloom_init(unsigned int capacity, double error_rate)
return bloom;
}
-scaling_bloom_t *new_scaling_bloom(unsigned int capacity, double error_rate)
+static scaling_bloom_t *new_scaling_bloom(unsigned int capacity, double error_rate, struct dabm_mem_stat *mstat)
{
scaling_bloom_t *bloom;
counting_bloom_t *cur_bloom;
- bloom = scaling_bloom_init(capacity, error_rate);
-
- if (!(cur_bloom = new_counting_bloom_from_scale(bloom))) {
- fprintf(stderr, "Error, Could not create counting bloom\n");
- free_scaling_bloom(bloom);
- return NULL;
- }
+ bloom = scaling_bloom_init(capacity, error_rate, mstat);
+ cur_bloom = new_counting_bloom_from_scale(bloom, mstat);
+ // if (!()) {
+ // fprintf(stderr, "Error, Could not create counting bloom\n");
+ // free_scaling_bloom(bloom, mstat);
+ // return NULL;
+ // }
+ assert(cur_bloom);
cur_bloom->header->count = 0;
cur_bloom->header->id = 0;
@@ -492,15 +502,17 @@ scaling_bloom_t *new_scaling_bloom(unsigned int capacity, double error_rate)
struct expiry_dablooms_handle{
scaling_bloom_t *cur_bloom;
scaling_bloom_t *next_bloom;
- time_t cur_bloom_start;
- time_t next_bloom_start;
- time_t last_bloom_check;
+ long cur_bloom_start_ms;
+ long next_bloom_start_ms;
+ long last_bloom_check_ms;
uint64_t cur_bloom_inc_id;
uint64_t next_bloom_inc_id;
unsigned int capacity;
- int expiry_time;
- time_t cur_time;
+ long expiry_time_ms;
+ long cur_time_ms;
+ long transition_time_ms;
double error_rate;
+ struct dabm_mem_stat mstat;
};
char* expiry_dablooms_errno_trans(enum expiry_dablooms_errno _errno){
@@ -517,28 +529,29 @@ char* expiry_dablooms_errno_trans(enum expiry_dablooms_errno _errno){
void expiry_dablooms_destroy(struct expiry_dablooms_handle *handle){
if(handle != NULL){
if(handle->cur_bloom != NULL){
- free_scaling_bloom(handle->cur_bloom);
+ free_scaling_bloom(handle->cur_bloom, &handle->mstat);
}
if(handle->next_bloom != NULL){
- free_scaling_bloom(handle->next_bloom);
+ free_scaling_bloom(handle->next_bloom, &handle->mstat);
}
- FREE(handle);
+ free(handle);
}
}
-struct expiry_dablooms_handle* expiry_dablooms_init(unsigned int capacity, double error_rate, time_t cur_time, int expiry_time){
- struct expiry_dablooms_handle *handle = ALLOC(struct expiry_dablooms_handle, 1);
- scaling_bloom_t *cur_bloom = new_scaling_bloom(capacity, error_rate);
+struct expiry_dablooms_handle* expiry_dablooms_init(unsigned int capacity, double error_rate, long cur_time_ms, long expiry_time_ms, long transition_time_ms){
+ struct expiry_dablooms_handle *handle = (struct expiry_dablooms_handle *)calloc(1, sizeof(struct expiry_dablooms_handle));
+ scaling_bloom_t *cur_bloom = new_scaling_bloom(capacity, error_rate, &handle->mstat);
if(cur_bloom == NULL){
goto error_out;
}
handle->cur_bloom = cur_bloom;
handle->cur_bloom_inc_id = 0;
- handle->cur_bloom_start=cur_time;
+ handle->cur_bloom_start_ms=cur_time_ms;
handle->capacity = capacity;
handle->error_rate = error_rate;
- handle->expiry_time = expiry_time;
- handle->cur_time = cur_time;
+ handle->expiry_time_ms = expiry_time_ms;
+ handle->cur_time_ms = cur_time_ms;
+ handle->transition_time_ms = transition_time_ms;
return handle;
error_out:
@@ -546,91 +559,123 @@ error_out:
return NULL;
}
-int expiry_dablooms_element_count_get(struct expiry_dablooms_handle *handle, uint64_t *count){
- if(handle == NULL || handle->cur_bloom == NULL){
- return EXPIRY_DABLOOMS_ERRNO_BLOOM_NULL;
+// int expiry_dablooms_element_count_get(struct expiry_dablooms_handle *handle, uint64_t *count){
+// if(handle == NULL || handle->cur_bloom == NULL){
+// return EXPIRY_DABLOOMS_ERRNO_BLOOM_NULL;
+// }
+// *count = handle->cur_bloom_inc_id;
+// return 0;
+// }
+
+static inline int bloom_in_transition_period(const struct expiry_dablooms_handle *handle, long cur_time_ms)
+{
+ if((cur_time_ms - handle->cur_bloom_start_ms) + handle->transition_time_ms < handle->expiry_time_ms){
+ return 0;
}
- *count = handle->cur_bloom_inc_id;
- return 0;
+ return 1;
}
-static int bloom_expired_check(struct expiry_dablooms_handle *handle, time_t cur_time){
- if(handle == NULL || handle->cur_bloom == NULL){
- return EXPIRY_DABLOOMS_ERRNO_BLOOM_NULL;
- }
- if(cur_time <= handle->last_bloom_check){
+static int bloom_expired_check(struct expiry_dablooms_handle *handle, long cur_time_ms, struct dabm_mem_stat *mstat){
+ // if(unlikely(handle == NULL || handle->cur_bloom == NULL)){
+ // return EXPIRY_DABLOOMS_ERRNO_BLOOM_NULL;
+ // }
+ if(likely(cur_time_ms <= handle->last_bloom_check_ms + 1000)){
return 0;
}
- time_t delta_time = cur_time - handle->cur_bloom_start;
- handle->cur_time=cur_time;
- if(delta_time >= handle->expiry_time){
- free_scaling_bloom(handle->cur_bloom);
+ long delta_time_ms = cur_time_ms - handle->cur_bloom_start_ms;
+ handle->cur_time_ms=cur_time_ms;
+ if(unlikely(delta_time_ms >= handle->expiry_time_ms)){
+ free_scaling_bloom(handle->cur_bloom, mstat);
if(handle->next_bloom != NULL){
handle->cur_bloom = handle->next_bloom;
- handle->cur_bloom_start = handle->next_bloom_start;
+ handle->cur_bloom_start_ms = handle->next_bloom_start_ms;
handle->cur_bloom_inc_id = handle->next_bloom_inc_id;
handle->next_bloom = NULL;
- handle->last_bloom_check=0;
+ handle->last_bloom_check_ms=0;
}
else{
- scaling_bloom_t *cur_bloom = new_scaling_bloom(handle->capacity, handle->error_rate);
- if(cur_bloom == NULL){
+ scaling_bloom_t *cur_bloom = new_scaling_bloom(handle->capacity, handle->error_rate, mstat);
+ if(unlikely(cur_bloom == NULL)){
return EXPIRY_DABLOOMS_ERRNO_NEW_BLOOM_FAIL;
}
handle->cur_bloom = cur_bloom;
handle->cur_bloom_inc_id = 0;
- handle->cur_bloom_start=cur_time;
- handle->last_bloom_check=0;
+ handle->cur_bloom_start_ms=cur_time_ms;
+ handle->last_bloom_check_ms=0;
}
}
else
{
- handle->last_bloom_check=cur_time;
+ handle->last_bloom_check_ms=cur_time_ms;
}
return 0;
}
-int expiry_dablooms_add(struct expiry_dablooms_handle *handle, const char *key, size_t len, time_t cur_time){
- if(key==NULL || len ==0 || handle == NULL)
- {
- return -1;
- }
- int ret = bloom_expired_check(handle, cur_time);
- if(ret < 0)
+int expiry_dablooms_add(struct expiry_dablooms_handle *handle, const char *key, size_t len, long cur_time_ms){
+ assert(!(key==NULL || len ==0 || handle == NULL));
+ int ret = bloom_expired_check(handle, cur_time_ms, &handle->mstat);
+ if(unlikely(ret < 0))
{
return ret;
}
+ __builtin_prefetch(key, 0, 0);
+ uint32_t checksum[4];
+ MurmurHash3_x64_128(key, len, SALT_CONSTANT, checksum);
+
+ long delta_time_ms = cur_time_ms - handle->cur_bloom_start_ms;
+ handle->cur_time_ms=cur_time_ms;
- scaling_bloom_add(handle->cur_bloom, key, len, handle->cur_bloom_inc_id);
- handle->cur_bloom_inc_id++;
- time_t delta_time = cur_time - handle->cur_bloom_start;
- handle->cur_time=cur_time;
- if(delta_time >= handle->expiry_time){
- if(handle->next_bloom == NULL){
- scaling_bloom_t *next_bloom = new_scaling_bloom(handle->capacity, handle->error_rate);
+ int is_transition = bloom_in_transition_period(handle, cur_time_ms);
+ if(unlikely(is_transition != 0)){
+ if(unlikely(handle->next_bloom == NULL)){
+ scaling_bloom_t *next_bloom = new_scaling_bloom(handle->capacity, handle->error_rate, &handle->mstat);
if(next_bloom == NULL){
return EXPIRY_DABLOOMS_ERRNO_NEW_BLOOM_FAIL;
}
handle->next_bloom = next_bloom;
handle->next_bloom_inc_id = 0;
- handle->next_bloom_start=cur_time;
+ handle->next_bloom_start_ms=cur_time_ms;
}
- scaling_bloom_add(handle->next_bloom, key, len, handle->next_bloom_inc_id);
+ scaling_bloom_add(handle->next_bloom, checksum, handle->next_bloom_inc_id, &handle->mstat);
handle->next_bloom_inc_id++;
+ }else{
+ scaling_bloom_add(handle->cur_bloom, checksum, handle->cur_bloom_inc_id, &handle->mstat);
+ handle->cur_bloom_inc_id++;
}
+
return 0;
}
-int expiry_dablooms_search(struct expiry_dablooms_handle *handle, const char *key, size_t len, time_t cur_time){
- if(key==NULL || len ==0 || handle == NULL)
- {
- return -1;
- }
- int ret = bloom_expired_check(handle, cur_time);
- if(ret < 0)
+int expiry_dablooms_search(struct expiry_dablooms_handle *handle, const char *key, size_t len, long cur_time_ms){
+ assert(!(key==NULL || len ==0 || handle == NULL));
+ int ret = bloom_expired_check(handle, cur_time_ms, &handle->mstat);
+ if(unlikely(ret < 0))
{
return ret;
}
- int bloom_hit = scaling_bloom_check(handle->cur_bloom, key, len);
- return bloom_hit;
+
+ __builtin_prefetch(key, 0, 0);
+ uint32_t checksum[4];
+ MurmurHash3_x64_128(key, len, SALT_CONSTANT, checksum);
+
+ int bloom_hit_cur = 0;
+ int bloom_hit_next = 0;
+ int is_transition = bloom_in_transition_period(handle, cur_time_ms);
+
+ if(unlikely(is_transition != 0) && (handle->next_bloom != NULL)){
+ bloom_hit_next = scaling_bloom_check(handle->next_bloom, checksum);
+ }
+
+ if(likely(0 == bloom_hit_next)){
+ bloom_hit_cur = scaling_bloom_check(handle->cur_bloom, checksum);
+ }
+
+ return bloom_hit_cur || bloom_hit_next;
}
+
+int expiry_dablooms_get_mem_stat(struct expiry_dablooms_handle *handle, long long *blocks, long long *bytes)
+{
+ *blocks = handle->mstat.blocks;
+ *bytes = handle->mstat.bytes;
+ return 0;
+} \ No newline at end of file
diff --git a/src/support/dablooms/src/dablooms.h b/src/support/dablooms/src/dablooms.h
index 017fb53..f93a9a0 100755
--- a/src/support/dablooms/src/dablooms.h
+++ b/src/support/dablooms/src/dablooms.h
@@ -5,6 +5,18 @@
#include <stdint.h>
#include <stdlib.h>
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#ifndef likely
+#define likely(x) __builtin_expect(!!(x), 1)
+#endif
+
+#ifndef unlikely
+#define unlikely(x) __builtin_expect(!!(x), 0)
+#endif
+
const char *dablooms_version(void);
typedef struct {
@@ -13,15 +25,15 @@ typedef struct {
} bitmap_t;
-bitmap_t *bitmap_resize(bitmap_t *bitmap, size_t old_size, size_t new_size);
-bitmap_t *new_bitmap(size_t bytes);
+// bitmap_t *bitmap_resize(bitmap_t *bitmap, size_t old_size, size_t new_size);
+// bitmap_t *new_bitmap(size_t bytes);
-int bitmap_increment(bitmap_t *bitmap, unsigned int index, long offset);
+// int bitmap_increment(bitmap_t *bitmap, unsigned int index, long offset);
int bitmap_decrement(bitmap_t *bitmap, unsigned int index, long offset);
-int bitmap_check(bitmap_t *bitmap, unsigned int index, long offset);
+// int bitmap_check(bitmap_t *bitmap, unsigned int index, long offset);
int bitmap_flush(bitmap_t *bitmap);
-void free_bitmap(bitmap_t *bitmap);
+// void free_bitmap(bitmap_t *bitmap);
typedef struct {
uint64_t id;
@@ -43,11 +55,11 @@ typedef struct {
bitmap_t *bitmap;
} counting_bloom_t;
-int free_counting_bloom(counting_bloom_t *bloom);
-counting_bloom_t *new_counting_bloom(unsigned int capacity, double error_rate);
-int counting_bloom_add(counting_bloom_t *bloom, const char *s, size_t len);
+// int free_counting_bloom(counting_bloom_t *bloom);
+// counting_bloom_t *new_counting_bloom(unsigned int capacity, double error_rate);
+// int counting_bloom_add(counting_bloom_t *bloom, const char *s, size_t len);
int counting_bloom_remove(counting_bloom_t *bloom, const char *s, size_t len);
-int counting_bloom_check(counting_bloom_t *bloom, const char *s, size_t len);
+// int counting_bloom_check(counting_bloom_t *bloom, const char *s, size_t len);
typedef struct {
uint64_t max_id;
@@ -65,11 +77,11 @@ typedef struct {
bitmap_t *bitmap;
} scaling_bloom_t;
-scaling_bloom_t *new_scaling_bloom(unsigned int capacity, double error_rate);
-int free_scaling_bloom(scaling_bloom_t *bloom);
-int scaling_bloom_add(scaling_bloom_t *bloom, const char *s, size_t len, uint64_t id);
+// scaling_bloom_t *new_scaling_bloom(unsigned int capacity, double error_rate);
+// int free_scaling_bloom(scaling_bloom_t *bloom);
+// int scaling_bloom_add(scaling_bloom_t *bloom, const char *s, size_t len, uint64_t id);
int scaling_bloom_remove(scaling_bloom_t *bloom, const char *s, size_t len, uint64_t id);
-int scaling_bloom_check(scaling_bloom_t *bloom, const char *s, size_t len);
+// int scaling_bloom_check(scaling_bloom_t *bloom, const char *s, size_t len);
int scaling_bloom_flush(scaling_bloom_t *bloom);
uint64_t scaling_bloom_mem_seqnum(scaling_bloom_t *bloom);
uint64_t scaling_bloom_disk_seqnum(scaling_bloom_t *bloom);
@@ -81,9 +93,13 @@ enum expiry_dablooms_errno{
};
char* expiry_dablooms_errno_trans(enum expiry_dablooms_errno _errno);
void expiry_dablooms_destroy(struct expiry_dablooms_handle *handle);
-struct expiry_dablooms_handle* expiry_dablooms_init(unsigned int capacity, double error_rate, time_t cur_time, int expiry_time);
+struct expiry_dablooms_handle* expiry_dablooms_init(unsigned int capacity, double error_rate, long cur_time_ms, long expiry_time_ms, long transition_time_ms);
int expiry_dablooms_element_count_get(struct expiry_dablooms_handle *handle, uint64_t *count);
int expiry_dablooms_add(struct expiry_dablooms_handle *handle, const char *key, size_t len, time_t cur_time);
int expiry_dablooms_search(struct expiry_dablooms_handle *handle, const char *key, size_t len, time_t cur_time);
+int expiry_dablooms_get_mem_stat(struct expiry_dablooms_handle *handle, long long *blocks, long long *bytes);
+#ifdef __cplusplus
+}
+#endif
#endif
diff --git a/src/support/dablooms/src/mem_util.c b/src/support/dablooms/src/mem_util.c
new file mode 100644
index 0000000..0978018
--- /dev/null
+++ b/src/support/dablooms/src/mem_util.c
@@ -0,0 +1,101 @@
+#include <sys/stat.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <fcntl.h>
+#include <math.h>
+#include <string.h>
+#include <sys/mman.h>
+#include <unistd.h>
+#include <errno.h>
+#include <time.h>
+#include <pthread.h>
+#include "murmur.h"
+#include "dablooms.h"
+#include "mem_util.h"
+
+struct dabm_mem_hdr{
+ size_t size;
+ char *user_ptr;
+};
+
+void *dabm_malloc(size_t user_size, struct dabm_mem_stat *mstat)
+{
+ size_t real_size = user_size + sizeof(struct dabm_mem_hdr);
+ char *real_ptr = (char *)malloc(real_size);
+ if (real_ptr)
+ {
+ struct dabm_mem_hdr *stat = (struct dabm_mem_hdr *)real_ptr;
+ stat->size = real_size;
+ stat->user_ptr = real_ptr + sizeof(struct dabm_mem_hdr);
+ mstat->blocks++;
+ mstat->bytes += real_size;
+ return stat->user_ptr;
+ }
+ return NULL;
+}
+
+void *dabm_calloc(size_t nmemb, size_t user_size, struct dabm_mem_stat *mstat)
+{
+ size_t real_size = nmemb * user_size + sizeof(struct dabm_mem_hdr);
+ char *real_ptr = (char *)calloc(1, real_size);
+ if (real_ptr)
+ {
+ struct dabm_mem_hdr *stat = (struct dabm_mem_hdr *)real_ptr;
+ stat->size = real_size;
+ stat->user_ptr = real_ptr + sizeof(struct dabm_mem_hdr);
+ mstat->blocks++;
+ mstat->bytes += real_size;
+ return stat->user_ptr;
+ }
+ return NULL;
+}
+
+void dabm_free(void *user_ptr, struct dabm_mem_stat *mstat)
+{
+ if (user_ptr)
+ {
+ struct dabm_mem_hdr *stat = (struct dabm_mem_hdr *)((char *)user_ptr - sizeof(struct dabm_mem_hdr));
+ mstat->blocks--;
+ mstat->bytes -= stat->size;
+ free(stat);
+ }
+}
+
+void *dabm_realloc(void *user_ptr, size_t new_size, struct dabm_mem_stat *mstat)
+{
+ char *old_real_ptr;
+ struct dabm_mem_hdr *stat;
+
+ if (user_ptr)
+ {
+ stat = (struct dabm_mem_hdr *)((char *)user_ptr - sizeof(struct dabm_mem_hdr));
+ old_real_ptr = (char *)stat;
+ mstat->bytes -= stat->size;
+ }
+ else
+ {
+ old_real_ptr = NULL;
+ mstat->blocks++; // no memory alloc before
+ }
+
+ size_t real_size = new_size + sizeof(struct dabm_mem_hdr);
+ char *new_real_ptr = (char *)realloc(old_real_ptr, real_size);
+ if (new_real_ptr)
+ {
+ mstat->bytes += real_size;
+ stat = (struct dabm_mem_hdr *)new_real_ptr;
+ stat->size = real_size;
+ stat->user_ptr = new_real_ptr + sizeof(struct dabm_mem_hdr);
+ return stat->user_ptr;
+ }
+
+ return NULL;
+}
+
+int dabm_get_mem_hdr_size(void)
+{
+ return sizeof(struct dabm_mem_hdr);
+}
+
diff --git a/src/support/dablooms/src/mem_util.h b/src/support/dablooms/src/mem_util.h
new file mode 100644
index 0000000..f038392
--- /dev/null
+++ b/src/support/dablooms/src/mem_util.h
@@ -0,0 +1,22 @@
+#ifndef _DABM_MEM_UTIL_H_
+#define _DABM_MEM_UTIL_H_ 1
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+ struct dabm_mem_stat
+ {
+ long blocks;
+ long bytes;
+ };
+
+ void *dabm_malloc(size_t size, struct dabm_mem_stat *mstat);
+ void *dabm_calloc(size_t nmemb, size_t size, struct dabm_mem_stat *mstat);
+ void dabm_free(void *user_ptr, struct dabm_mem_stat *mstat);
+ void *dabm_realloc(void *ptr, size_t new_size, struct dabm_mem_stat *mstat);
+ int dabm_get_mem_hdr_size(void);
+#ifdef __cplusplus
+}
+#endif
+#endif \ No newline at end of file
diff --git a/src/support/dablooms/src/test_mem_util.c b/src/support/dablooms/src/test_mem_util.c
new file mode 100644
index 0000000..f8176d1
--- /dev/null
+++ b/src/support/dablooms/src/test_mem_util.c
@@ -0,0 +1,63 @@
+#include <sys/stat.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <fcntl.h>
+#include <math.h>
+#include <string.h>
+#include <sys/mman.h>
+#include <unistd.h>
+#include <errno.h>
+#include <assert.h>
+#include <time.h>
+#include "murmur.h"
+#include "dablooms.h"
+#include "mem_util.h"
+
+int main(int argc, char const *argv[])
+{
+ int i;
+ struct dabm_mem_stat mstat = {};
+
+ char *ptr = (char *)dabm_malloc(100, &mstat);
+ assert(ptr != NULL);
+
+ assert(mstat.blocks == 1);
+ assert(mstat.bytes == 100 + dabm_get_mem_hdr_size());
+ dabm_free(ptr, &mstat);
+ assert(mstat.blocks == 0);
+ assert(mstat.bytes == 0);
+
+ int *iptr = (int *)dabm_calloc(100, sizeof(int), &mstat);
+ assert(iptr != NULL);
+ for (i = 0; i < 100; i++)
+ {
+ assert(iptr[i] == 0);
+ }
+ assert(mstat.blocks == 1);
+ assert(mstat.bytes == 100 + dabm_get_mem_hdr_size());
+ dabm_free(iptr, &mstat);
+
+ ptr = dabm_realloc(NULL, 100, &mstat);
+ assert(ptr != NULL);
+
+ assert(mstat.blocks == 1);
+ assert(mstat.bytes == 100 + dabm_get_mem_hdr_size());
+ memset(ptr, 0xAA, 100);
+
+ unsigned char *uptr = (unsigned char *)dabm_realloc(ptr, 1000, &mstat);
+ for (i = 0; i < 100; i++)
+ {
+ assert(uptr[i] == 0xAA);
+ }
+ assert(mstat.blocks == 1);
+ assert(mstat.bytes == 100 + dabm_get_mem_hdr_size());
+
+ dabm_free(uptr, &mstat);
+ assert(mstat.blocks == 0);
+ assert(mstat.bytes == 0);
+
+ printf("test mem util success!\n");
+ return 0;
+}