summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlijia <[email protected]>2024-05-11 17:56:37 +0800
committeryangwei <[email protected]>2024-05-17 14:10:21 +0800
commit0f7206776498b621de13255189cf1c659c04a701 (patch)
treef7d749e6f4c0796e164de4fea15e60e4b9b6f49a
parentb497224a3738a8c55768915288eafdeb97573d66 (diff)
TSG-20808,TSG-20954:split bm into multi-partition to avoid long latency by memset large memory;Both bm are queried in transition
-rw-r--r--benchmark/sapp_default_config/etc/sapp.toml2
-rw-r--r--bin/etc/sapp.toml2
-rw-r--r--include/private/duplicate_pkt_distinguish.h8
-rw-r--r--include/private/sapp_global_val.h42
-rw-r--r--include/private/sapp_mem.h10
-rw-r--r--include/private/stream_internal.h3
-rw-r--r--module_test/src/CMakeLists.txt6
-rw-r--r--module_test/src/gtest_main.cpp11
-rw-r--r--module_test/src/gtest_sapp_bloom.cpp269
-rw-r--r--module_test/src/gtest_sapp_fun.h1
-rw-r--r--module_test/src/gtest_sapp_support.cpp20
-rw-r--r--module_test/src/gtest_sapp_support_plug.cpp11
-rw-r--r--src/common/sapp_mem.c40
-rw-r--r--src/config/config_parse.cpp25
-rw-r--r--src/dealpkt/duplicate_pkt_distinguish.c165
-rw-r--r--src/sapp_dev/sapp_init.c2
-rw-r--r--src/support/ap_bloom/src/ap_bloom.c8
-rw-r--r--src/support/dablooms/CMakeLists.txt2
-rwxr-xr-xsrc/support/dablooms/src/dablooms.c481
-rwxr-xr-xsrc/support/dablooms/src/dablooms.h44
-rw-r--r--src/support/dablooms/src/mem_util.c101
-rw-r--r--src/support/dablooms/src/mem_util.h22
-rw-r--r--src/support/dablooms/src/test_mem_util.c63
23 files changed, 1017 insertions, 321 deletions
diff --git a/benchmark/sapp_default_config/etc/sapp.toml b/benchmark/sapp_default_config/etc/sapp.toml
index 0c1c32e..2805521 100644
--- a/benchmark/sapp_default_config/etc/sapp.toml
+++ b/benchmark/sapp_default_config/etc/sapp.toml
@@ -133,6 +133,8 @@
# 0:disable bloom filter, 1:dabloom, 2:apbloom
#bloom_library=1
first_packets=3
+ bloom_partition_num=1
+
[STREAM]
### note, stream_id_base_time format is "%Y-%m-%d %H:%M:%S"
diff --git a/bin/etc/sapp.toml b/bin/etc/sapp.toml
index 0c1c32e..50fbfa5 100644
--- a/bin/etc/sapp.toml
+++ b/bin/etc/sapp.toml
@@ -133,6 +133,8 @@
# 0:disable bloom filter, 1:dabloom, 2:apbloom
#bloom_library=1
first_packets=3
+ bloom_partition_num=16
+ bloom_transition_time=3
[STREAM]
### note, stream_id_base_time format is "%Y-%m-%d %H:%M:%S"
diff --git a/include/private/duplicate_pkt_distinguish.h b/include/private/duplicate_pkt_distinguish.h
index dffd6e8..2a437cf 100644
--- a/include/private/duplicate_pkt_distinguish.h
+++ b/include/private/duplicate_pkt_distinguish.h
@@ -4,10 +4,10 @@
#include "mesa_net.h"
#include "stream_internal.h"
-/* 网络序, network order */
+/* 网络�?, network order */
-/* ipv4和ipv6分别定义, 因为ipv6地址巨大, 比例也相对较少,
- 统一使用union定义, 都会按最大长度计算, 影响性能.
+/* ipv4和ipv6分别定义, 因为ipv6地址巨大, 比例也相对较�?,
+ 统一使用union定义, 都会按最大长度计�?, 影响性能.
*/
struct sapp_dup_key_ip4_tuple{
unsigned int sip;
@@ -29,7 +29,7 @@ struct __sapp_dup_pkt_key_l4{
struct __sapp_dup_pkt_key_v4{
struct __sapp_dup_pkt_key_l4 l4;
- unsigned short ip_id; /* TCP层SYN重传时, 或者应用层重传数据, ipid是不一样的, 其实不能算为重复包, 所以ipid也作为Key */
+ unsigned short ip_id; /* TCP层SYN重传�?, 或者应用层重传数据, ipid是不一样的, 其实不能算为重复�?, 所以ipid也作为Key */
struct sapp_dup_key_ip4_tuple ip_addr;
}__attribute__((packed, aligned(1)));
diff --git a/include/private/sapp_global_val.h b/include/private/sapp_global_val.h
index b4b41da..80caa54 100644
--- a/include/private/sapp_global_val.h
+++ b/include/private/sapp_global_val.h
@@ -18,7 +18,7 @@ enum pkt_dump_mode{
/*
- vxlan��vlan_flipping�ǿ��Թ����?,
+ vxlan��vlan_flipping�ǿ��Թ����?,
vxlan������������,
vlan_flipping���ڱ����Լ�����,
*/
@@ -42,7 +42,7 @@ typedef struct{
typedef struct{
int worker_threads;
- int send_only_threads_max_num; /* �������߳��������?, �Dz����̬������?, sapp�����޷�Ԥ֪, ֻ��������ֵ */
+ int send_only_threads_max_num; /* �������߳��������?, �Dz����̬������?, sapp�����޷�Ԥ֪, ֻ��������ֵ */
int bind_mask_array_num;
long bind_mask_array[SAPP_MAX_THREADS];
long bind_mask; /* bind_mask_array�е�ÿ��������Ϊbit_indexת�ɵ�����ֵ */
@@ -117,7 +117,7 @@ typedef struct{
int timeout;
int meaningful_statistics_minimum_pkt;
int meaningful_statistics_minimum_byte;
- unsigned short *well_known_ports_array; /* �����½�����ȷ��server��, ����4001->8000, ʵ������OICQ��C2S�������ݰ�,֮ǰ�İ汾�ᱻ��ʶ��ΪS2C�����? */
+ unsigned short *well_known_ports_array; /* �����½�����ȷ��server��, ����4001->8000, ʵ������OICQ��C2S�������ݰ�,֮ǰ�İ汾�ᱻ��ʶ��ΪS2C�����? */
int well_known_ports_array_num;
int max_timeouts_per_sec;
int max_opening_per_sec;
@@ -161,9 +161,9 @@ typedef struct{
int process_latency_refresh_interval_s;
int process_latency_threshold_in_us; /* �����ļ��ﵥλ��microsecond, ��Ҫ����1000 */
/* ��ȷģʽ������, fs2����ͳ��Ҳ�dz���CPU, ÿ������Ҫ����FS_operate(),
- ���ͣ�þ�ȷģ�?, ����fs2���DZ������һ��ʱ�������?, ��Сֵ����һ��ʱ���ƽ���?, ������FS_operate()����,
+ ���ͣ�þ�ȷģ�?, ����fs2���DZ������һ��ʱ�������?, ��Сֵ����һ��ʱ���ƽ���?, ������FS_operate()����,
��ȷ�Ƚ�����һЩ, ���������˵���FS_operate()�Ĵ���.
- ����pkt_latency_accurate_enable��ƽ��ֵ�����ǶԵ�, ����׼��Ͳ�׼��?.
+ ����pkt_latency_accurate_enable��ƽ��ֵ�����ǶԵ�, ����׼��Ͳ�׼��?.
*/
int process_latency_clock_id;
@@ -191,7 +191,7 @@ enum __sapp_config_packet_io_deployment_mode{
SAPP_CFG_PKT_IO_DEPOLY_MODE_TRANSPARENT,
};
-#define MAX_VLAN_FLIPPING_MAP_NUM (4096) /* vlan_id���Χ��?0~4095, ���鳤����4096 */
+#define MAX_VLAN_FLIPPING_MAP_NUM (4096) /* vlan_id���Χ��?0~4095, ���鳤����4096 */
typedef struct{
unsigned short couple_vlan_id; /* vlan flipping�dzɶԴ���, �����±��ʾһ��vlanid, Ԫ�ص�ֵ����һ��vlanid, host order */
@@ -221,9 +221,11 @@ typedef struct{
int dup_pkt_distinguish_ipv4_tcp;
int dup_pkt_distinguish_ipv4_udp;
int bloom_capacity;
- int bloom_timeout;
int bloom_slice_num;
+ long bloom_timeout_ms;
enum bloom_library bloom_library;
+ int bloom_partition_num;
+ long transition_time_ms;
double bloom_error_rate;
int first_packets;
}sapp_dup_pkt_t;
@@ -242,17 +244,17 @@ typedef struct{
sapp_config_packet_io_tunnel_t packet_io_tunnel;
const char *input_bpf_filter;
char deployment_mode_str[NAME_MAX]; /* [mirror, inline, transparent, dumpfile] */
- enum sapp_deploment_mode_t deployment_mode_bin; /* ��������ڴ���?, ת����ֵ��ʽ, ��1:mirror; 2:transparent; 3:inline��, ���?: enum sapp_deploment_mode_t */
+ enum sapp_deploment_mode_t deployment_mode_bin; /* ��������ڴ���?, ת����ֵ��ʽ, ��1:mirror; 2:transparent; 3:inline��, ���?: enum sapp_deploment_mode_t */
sapp_config_packet_io_dev_t internal;
sapp_config_packet_io_dev_t external;
char pcap_dumpfile_name[NAME_MAX];
int polling_priority; /* call sapp_recv_pkt every call polling_entry times, ���ö��ٴ�polling�����һ��recv pkt, 1��ʾ�������ȼ���ͬ */
int work_percent;
int without_usleep; /* ���ܲ���ģʽ, �������usleep, CPU��100% */
- int inbound_route_dir; /* ��ʾ�뾳, I2C�����ֵ��?0����1 */
+ int inbound_route_dir; /* ��ʾ�뾳, I2C�����ֵ��?0����1 */
char pcap_capture_direction[NAME_MAX]; /* in, out, inout */
char inject_pkt_mode_string[NAME_MAX];
- int inject_pkt_mode; /* ��ע�뷽ʽ������ֵ, ���?: enum send_fake_packet_mode */
+ int inject_pkt_mode; /* ��ע�뷽ʽ������ֵ, ���?: enum send_fake_packet_mode */
int inject_pkt_prepend_segment_id;
unsigned short inject_mode_inline_device_sport; /* udp socket�󶨵ı���Դ�˿�, host order, ������ */
char inject_mode_single_gateway_device[NAME_MAX];
@@ -306,7 +308,7 @@ typedef struct{
const char *cfg_file_inline_dev_relative;
const char *cfg_file_inline_dev_absolute;
- const char *cfg_file_necessary_plug_relative; /* ��Ҫ����б�?, �������ʧ��sapp���˳� */
+ const char *cfg_file_necessary_plug_relative; /* ��Ҫ����б�?, �������ʧ��sapp���˳� */
const char *cfg_file_necessary_plug_absolute;
const char *cfg_file_stream_compare_layer_relative;
@@ -357,7 +359,7 @@ typedef struct{
typedef struct{
int ipv6_decapsulation_enabled; /* �Ƿ����ipv6���ݰ� */
- int ipv6_send_packet_enabled; /* �Ƿ�֧�ַ���ipv6���ݰ�, ��������ϵͳipv6�ں�ģ���Ƿ����? */
+ int ipv6_send_packet_enabled; /* �Ƿ�֧�ַ���ipv6���ݰ�, ��������ϵͳipv6�ں�ģ���Ƿ����? */
int tcp_drop_pure_ack_pkt; /* ����û�и��صĴ�ack��, ���Խ�Լһ��������ѯ, ����ҵ���������� */
int tcp_syn_option_parse_enabled; /* �Ƿ����tcp syn��ͷ��ѡ�� */
int skip_not_ip_layer_over_eth; /* ������ip��, ��֤�ڲ���ģʽ��, ����ͨ��ϵͳ·�ɷ���rst�� */
@@ -422,15 +424,15 @@ typedef struct{
pthread_t thread_timer_loop_id;
int cpu_bind_core_id_per_thread[SAPP_MAX_THREADS]; /* ���������bind_mask, ��¼ÿ��IO�����̰߳󶨵�cpu core id */
void *breakpad;
- char overlay_layer_def[__ADDR_TYPE_MAX][SAPP_SUPPORT_LAYER_NUM_MAX]; /* ��ʾ��ԭʼ���Ļ����Ϸ�װ������, ��vxlan, ��Щ�㲻Ӧ�õ��ò��?, ��עʱ��Ҫ���⴦�� */
- char prune_layer_def[__ADDR_TYPE_MAX][SAPP_SUPPORT_LAYER_NUM_MAX]; /* ��ʾע���ʱ��Ҫ�����IJ�?, ���ڴ�������������ļ�?: skip_not_iop_layer, ����vlan, mpls��, ����mirrorģʽ��, ������ע��㲻һ�������绷��? */
+ char overlay_layer_def[__ADDR_TYPE_MAX][SAPP_SUPPORT_LAYER_NUM_MAX]; /* ��ʾ��ԭʼ���Ļ����Ϸ�װ������, ��vxlan, ��Щ�㲻Ӧ�õ��ò��?, ��עʱ��Ҫ���⴦�� */
+ char prune_layer_def[__ADDR_TYPE_MAX][SAPP_SUPPORT_LAYER_NUM_MAX]; /* ��ʾע���ʱ��Ҫ�����IJ�?, ���ڴ�������������ļ�?: skip_not_iop_layer, ����vlan, mpls��, ����mirrorģʽ��, ������ע��㲻һ�������绷��? */
void *under_ddos_handle;
pthread_t gdev_keepalive_log_thread_id;
}sapp_gval_individual_fixed_t;
typedef struct{
- unsigned long long count[SAPP_STAT_NUM_MAX]; /* ������, �����?, �½�������, ������������ */
+ unsigned long long count[SAPP_STAT_NUM_MAX]; /* ������, �����?, �½�������, ������������ */
unsigned long long length[SAPP_STAT_NUM_MAX]; /* ������, �����ݰ����ֽ��� */
unsigned long long count_per_layer[__ADDR_TYPE_MAX][SAPP_SUPPORT_LAYER_NUM_MAX];
unsigned long long length_per_layer[__ADDR_TYPE_MAX][SAPP_SUPPORT_LAYER_NUM_MAX];
@@ -457,7 +459,7 @@ struct __sapp_gval_mthread{
sapp_fuzzy_latency_stat_t fuzzy_pkt_latency_stat_per_thread;
sapp_fuzzy_latency_stat_t fuzzy_pkt_latency_stat_per_entry_per_thread[SAPP_MAX_PLUG_ENTRY_NUM];
const raw_pkt_t *raw_pkt; /* ��ΪMESA_fakepacket_send_xxxϵ�к���û��raw_pkt����, Ҳû��stream_info, Ϊ����ǰ����, �����޸Ľӿ�, ����ȫ�ֱ������ڴ洢��ǰ�� */
- void *dup_pkt_distinguish_handle; /* ����������ڿ���?, inject, ipv4_tcp, ipv4_udp, ��ͬʹ��һ��bloom���? */
+ void *dup_pkt_distinguish_handle; /* ����������ڿ���?, inject, ipv4_tcp, ipv4_udp, ��ͬʹ��һ��bloom���? */
volatile int destory_env_done; //0:do nothing; 1:doing; 2:done
sapp_mem_used_stat_t mem_used_stat;
}__attribute__ ((aligned (64))); /* for multi-thread, must 64byte alignment */
@@ -476,16 +478,16 @@ typedef struct{
sapp_cmd_args_val_t **cmd_args_array;
int cmd_args_num;
char **dumpfie_list_array; /* ����--dumpfile-list����ִ�к�, �õ����ļ��б�, �ַ���ָ������ */
- int slient_mode; /* ��Ĭģʽ, ǰ̨������κ����? */
+ int slient_mode; /* ��Ĭģʽ, ǰ̨������κ����? */
}sapp_cla_t;
typedef struct{
sapp_cla_t cla;
sapp_config_t config;
- sapp_gval_individual_fixed_t individual_fixed; /* ��ʼ�����ٸı�ı���? */
- sapp_gval_individual_volatile_t *individual_volatile; /* �����������ʵʱ���µĶ�������? */
- sapp_gval_mthread_t *mthread_volatile[SAPP_MAX_THREADS]; /* �����������ʵʱ���µĶ��̱߳���? */
+ sapp_gval_individual_fixed_t individual_fixed; /* ��ʼ�����ٸı�ı���? */
+ sapp_gval_individual_volatile_t *individual_volatile; /* �����������ʵʱ���µĶ�������? */
+ sapp_gval_mthread_t *mthread_volatile[SAPP_MAX_THREADS]; /* �����������ʵʱ���µĶ��̱߳���? */
}sapp_global_t;
extern embed_layer_t g_stream_compare_layer_set;
diff --git a/include/private/sapp_mem.h b/include/private/sapp_mem.h
index d68a0ac..b6b6ca8 100644
--- a/include/private/sapp_mem.h
+++ b/include/private/sapp_mem.h
@@ -46,14 +46,14 @@ typedef enum{
SAPP_MEM_DYN_PADDR, /* 每层地址所占的内存, 就不分具体协议了, 太麻烦了!! */
SAPP_MEM_DYN_DETAIN_PKT,
SAPP_MEM_DYN_SID_LIST,
-
+ SAPP_MEM_DYN_BLOOM_FILTER,
__SAPP_MEM_TYPE_MAX,
}sapp_mem_type_t;
-void *sapp_mem_malloc(sapp_mem_type_t type, int thread_seq, int size);
-void *sapp_mem_calloc(sapp_mem_type_t type, int thread_seq, int size);
+void *sapp_mem_malloc(sapp_mem_type_t type, int thread_seq, unsigned int size);
+void *sapp_mem_calloc(sapp_mem_type_t type, int thread_seq, unsigned int size);
void sapp_mem_free(sapp_mem_type_t type, int thread_seq, void *data);
-void *sapp_mem_realloc(sapp_mem_type_t type, int thread_seq, void *old_ptr, int size);
+void *sapp_mem_realloc(sapp_mem_type_t type, int thread_seq, void *old_ptr, unsigned int size);
void sapp_mem_stat_output(void);
#ifndef SAPP_FREE
@@ -69,6 +69,8 @@ void sapp_mem_stat_output(void);
#define SAPP_GLOBAL_FREE(mem) do{if(mem){sapp_mem_free(SAPP_MEM_FIX_GLOBAL_VAL, MEM_STAT_GLOBAL_THREAD_ID, (void *)mem); mem = NULL;}}while(0)
#endif
+int bloomfilter_get_mem_stat(int tseq, long long *mem_blocks, long long *mem_bytes);
+
#ifdef __cplusplus
}
#endif
diff --git a/include/private/stream_internal.h b/include/private/stream_internal.h
index 90030b5..c57f5f0 100644
--- a/include/private/stream_internal.h
+++ b/include/private/stream_internal.h
@@ -523,8 +523,7 @@ void packet_io_device_alias_destroy(void);
void sapp_dup_pkt_destroy(void);
const char *stream_bridge_id_to_name(int bridge_id);
void sapp_printf(const char *fmt, ...);
-
-
+int sapp_mem_init(void);
#ifdef __cplusplus
}
diff --git a/module_test/src/CMakeLists.txt b/module_test/src/CMakeLists.txt
index a5ecba4..d041dad 100644
--- a/module_test/src/CMakeLists.txt
+++ b/module_test/src/CMakeLists.txt
@@ -7,6 +7,8 @@ include_directories(${PROJECT_SOURCE_DIR}/include/private)
include_directories(${PROJECT_SOURCE_DIR}/include/public)
include_directories(${PROJECT_SOURCE_DIR}/include/support)
include_directories(${PROJECT_SOURCE_DIR}/include/public/stream_inc)
+include_directories(${PROJECT_SOURCE_DIR}/src/support/dablooms/src)
+include_directories(${PROJECT_SOURCE_DIR}/src/support/ap_bloom/src)
add_definitions(-DSAPP_V4=1)
add_definitions(-fPIC)
@@ -14,9 +16,9 @@ add_definitions(-fPIC)
add_executable(gtest_sapp_v4 gtest_main.cpp gtest_sapp_ipv4.cpp gtest_sapp_ipv6.cpp gtest_sapp_tcp.cpp gtest_sapp_udp.cpp gtest_sapp_comm.cpp gtest_sapp_support.cpp gtest_sapp_plug_ctrl.cpp gtest_sapp_tunnel.cpp
gtest_transparent_run.cpp gtest_sapp_jump_layer.cpp gtest_inline_run.cpp gtest_sapp_asymmetric.cpp gtest_sapp_inject.cpp gtest_sapp_app_state.cpp
gtest_mpls.cpp gtest_vlan.cpp gtest_fake_marsio_run.cpp gtest_sapp_proxy.cpp gtest_sapp_pkt_dump.cpp
- ../test_case/auto_gen_test_functions.cpp ../test_case/test_function_common.cpp)
+ ../test_case/auto_gen_test_functions.cpp ../test_case/test_function_common.cpp gtest_sapp_bloom.cpp)
-target_link_libraries(gtest_sapp_v4 gtest-static ${SAPP_DEPEND_DYN_LIB} sapp_benchmark sapp_devel)
+target_link_libraries(gtest_sapp_v4 gtest-static ${SAPP_DEPEND_DYN_LIB} sapp_benchmark sapp_devel libdabloom)
add_executable(transparent_test_sapp_v4 gtest_transparent_env.cpp gtest_sapp_comm.cpp)
target_link_libraries(transparent_test_sapp_v4 pthread pcap m dl MESA_jump_layer MESA_handle_logger)
diff --git a/module_test/src/gtest_main.cpp b/module_test/src/gtest_main.cpp
index 913e1ef..6c0e475 100644
--- a/module_test/src/gtest_main.cpp
+++ b/module_test/src/gtest_main.cpp
@@ -2211,6 +2211,17 @@ TEST(performance, simple)
#endif
+#include "sapp_global_val.h"
+
+TEST(dabloom, simple)
+{
+ ASSERT_EQ(0, sapp_bloom_filter_test_run(BLOOM_LIBRARY_DABLOOM));
+}
+
+TEST(apbloom, simple)
+{
+ ASSERT_EQ(0, sapp_bloom_filter_test_run(BLOOM_LIBRARY_APBLOOM));
+}
static const char *gtest_cla_short_options = "hvLsf:l:";
diff --git a/module_test/src/gtest_sapp_bloom.cpp b/module_test/src/gtest_sapp_bloom.cpp
new file mode 100644
index 0000000..8f49c64
--- /dev/null
+++ b/module_test/src/gtest_sapp_bloom.cpp
@@ -0,0 +1,269 @@
+#include <stdlib.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <netinet/ip.h>
+#include <netinet/tcp.h>
+#include <netinet/udp.h>
+#include <assert.h>
+#include <time.h>
+#include <arpa/inet.h>
+#include "stream.h"
+#include <sys/types.h> /* See NOTES */
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <errno.h>
+#include "gtest_sapp_fun.h"
+#include <gtest/gtest.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <time.h>
+#include <pthread.h>
+#include "dablooms.h"
+#include "sapp_global_val.h"
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+ void bloom_free_partition(void *bloom_filter, const sapp_dup_pkt_t *dup_conf);
+ void *bloom_new_partition(const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms);
+ int bloom_check_partition(void *bloom_filter, const char *key, int key_len, const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms);
+ void bloom_add_partition(void *bloom_filter, const char *key, int key_len, const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms);
+#ifdef __cplusplus
+}
+#endif
+
+static const unsigned int BM_CAPACITY = 200000;
+static const double BM_ERROR_RATE = 0.000001;
+static const int BM_TIMEOUT = 10 * 1000; // ms
+static const int BM_PARTITION_NUM = 64;
+static const long BM_TRANSITION_TIME = 2 * 1000;
+
+static const int MAX_ITEM_NUM = 10000000;
+static const int ITEM_BATCH_NUM = 1000;
+#define TUPLE4_ADDR_LEN (12 + 1) // sizeof(tuple4) add begin char 'Y' or 'N'
+#define BM_TEST_MAX_THREAD (1)
+static pthread_t bm_test_thread_id[BM_TEST_MAX_THREAD];
+static pthread_t timer_thread_id;
+static volatile struct timespec g_current_time_sp = {};
+static volatile long long g_current_time_in_ms = 0;
+static struct timeval g_current_time_tv;
+static volatile int timer_thread_run = 1;
+
+static const unsigned int INIT_SIP = 0x12345678;
+static const unsigned int INIT_DIP = 0x87654321;
+static const unsigned short INIT_SPORT = 0x1234;
+static const unsigned short INIT_DPORT = 0x4321;
+
+static inline void bm_update_key(char *tuple4_buf, unsigned int index)
+{
+ unsigned int *p_sip = (unsigned int *)&tuple4_buf[1];
+ unsigned int *p_dip = (unsigned int *)&tuple4_buf[5];
+ unsigned short *p_sport = (unsigned short *)&tuple4_buf[9];
+ unsigned short *p_dport = (unsigned short *)&tuple4_buf[11];
+
+ *p_sip = INIT_SIP + index;
+ *p_dip = INIT_DIP + index;
+ *p_sport = INIT_SPORT + index;
+ *p_dport = INIT_DPORT + index;
+}
+
+static unsigned long long max_add_time = 0; // us
+static unsigned long long total_add_time = 0; // us
+static unsigned long long max_search_time = 0; // us
+static unsigned long long total_search_time = 0; // us
+
+// for test, only support one thread
+static struct timespec start_time, end_time;
+
+// return us
+static inline unsigned long long bm_time_diff(const struct timespec *start_time, const struct timespec *end_time)
+{
+ if (start_time->tv_sec == end_time->tv_sec)
+ {
+ return (unsigned long long)(end_time->tv_nsec - start_time->tv_nsec) / 1000;
+ }
+ return ((unsigned long long)end_time->tv_sec * 1000000 + end_time->tv_nsec / 1000) - ((unsigned long long)start_time->tv_sec * 1000000 + start_time->tv_nsec / 1000);
+}
+
+static void bm_add_item(void *bloom_filter, char *tuple4_buf, unsigned int index, const sapp_dup_pkt_t *dup_conf)
+{
+ bm_update_key(tuple4_buf, index);
+ clock_gettime(CLOCK_REALTIME, &start_time);
+ g_current_time_tv.tv_sec = g_current_time_sp.tv_sec;
+ g_current_time_tv.tv_usec = g_current_time_sp.tv_nsec / 1000;
+ bloom_add_partition(bloom_filter, tuple4_buf, TUPLE4_ADDR_LEN, dup_conf, g_current_time_tv, g_current_time_in_ms);
+ clock_gettime(CLOCK_REALTIME, &end_time);
+
+ unsigned long long time_diff = bm_time_diff(&start_time, &end_time);
+ if (time_diff > max_add_time)
+ {
+ max_add_time = time_diff;
+ }
+ total_add_time += (unsigned long long)time_diff;
+}
+
+static int bm_search_item(void *bloom_filter, char *tuple4_buf, unsigned int index, const sapp_dup_pkt_t *dup_conf)
+{
+ bm_update_key(tuple4_buf, index);
+ clock_gettime(CLOCK_REALTIME, &start_time);
+ g_current_time_tv.tv_sec = g_current_time_sp.tv_sec;
+ g_current_time_tv.tv_usec = g_current_time_sp.tv_nsec / 1000;
+ int ret = bloom_check_partition(bloom_filter, tuple4_buf, TUPLE4_ADDR_LEN, dup_conf, g_current_time_tv, g_current_time_in_ms);
+ clock_gettime(CLOCK_REALTIME, &end_time);
+ long long time_diff = bm_time_diff(&start_time, &end_time);
+ if (time_diff > max_search_time)
+ {
+ max_search_time = time_diff;
+ }
+ total_search_time += (unsigned long long)time_diff;
+ return ret;
+}
+
+static int bm_test(int bloom)
+{
+ sapp_dup_pkt_t dup_conf = {};
+ dup_conf.bloom_capacity = BM_CAPACITY;
+ dup_conf.bloom_error_rate = BM_ERROR_RATE;
+ dup_conf.bloom_timeout_ms = BM_TIMEOUT;
+ dup_conf.bloom_library = (enum bloom_library)bloom;
+ dup_conf.kickout_udp_stream_enabled = 1;
+ dup_conf.dup_pkt_distinguish_all_inject = 1;
+ dup_conf.dup_pkt_distinguish_ipv4_tcp = 1;
+ dup_conf.dup_pkt_distinguish_ipv4_udp = 1;
+ dup_conf.bloom_partition_num = BM_PARTITION_NUM;
+ dup_conf.transition_time_ms = BM_TRANSITION_TIME;
+
+ void *dabm_handle = bloom_new_partition(&dup_conf, g_current_time_tv, g_current_time_in_ms);
+ assert(dabm_handle);
+ char tuple4_buf[1024] = {};
+ long long search_y_error_num = 0;
+ long long search_n_error_num = 0;
+ time_t cur_time = time(NULL);
+ long long tot_collision_num = 0;
+
+ tuple4_buf[0] = 'Y';
+ bm_update_key(tuple4_buf, 0);
+
+ bloom_add_partition(dabm_handle, tuple4_buf, TUPLE4_ADDR_LEN, &dup_conf, g_current_time_tv, g_current_time_in_ms);
+ int hit = bloom_check_partition(dabm_handle, tuple4_buf, TUPLE4_ADDR_LEN, &dup_conf, g_current_time_tv, g_current_time_in_ms);
+ if (hit <= 0)
+ {
+ fprintf(stderr, "not found tuple4!\n");
+ bloom_free_partition(dabm_handle, &dup_conf);
+ return -1;
+ }
+
+ printf("starting test, capacity:%u, timeout:%dms, partition:%d, batch_num:%d, max-items:%d\n",
+ BM_CAPACITY, BM_TIMEOUT, BM_PARTITION_NUM, ITEM_BATCH_NUM, MAX_ITEM_NUM);
+ int ret;
+ int add_index = 0, search_index_y = 0, search_index_n = 0;
+ while (add_index < MAX_ITEM_NUM || search_index_y < MAX_ITEM_NUM || search_index_n < MAX_ITEM_NUM)
+ {
+ tuple4_buf[0] = 'Y';
+ for (int b = 0; b < ITEM_BATCH_NUM && add_index < MAX_ITEM_NUM; b++, add_index++)
+ {
+ bm_add_item(dabm_handle, tuple4_buf, add_index, &dup_conf);
+ }
+
+ tuple4_buf[0] = 'Y';
+ for (int b = 0; b < ITEM_BATCH_NUM && search_index_y < MAX_ITEM_NUM; b++, search_index_y++)
+ {
+ ret = bm_search_item(dabm_handle, tuple4_buf, search_index_y, &dup_conf);
+ if (ret <= 0) // expect exist
+ {
+ search_y_error_num++;
+ }
+ }
+
+ tuple4_buf[0] = 'N';
+ for (int b = 0; b < ITEM_BATCH_NUM && search_index_n < MAX_ITEM_NUM; b++, search_index_n++)
+ {
+ ret = bm_search_item(dabm_handle, tuple4_buf, search_index_n, &dup_conf);
+ if (ret > 0) // expect not exist
+ {
+ search_n_error_num++;
+ }
+ }
+ }
+
+ printf("add %lld items success, avg-time:%.3fus, max:%lluus\n",
+ (long long)MAX_ITEM_NUM, (double)total_add_time / (double)MAX_ITEM_NUM, max_add_time);
+
+ printf("search %lld items , avg-time:%.3fus, max:%lluus\n",
+ (long long)MAX_ITEM_NUM, (double)total_search_time / (double)MAX_ITEM_NUM, max_search_time);
+
+ double err_rate = (double)search_n_error_num / (double)MAX_ITEM_NUM;
+ printf("search_y_error_num:%lld \nsearch_n_error_num:%lld, errer-rate:%f\n",
+ search_y_error_num, search_n_error_num, err_rate);
+
+ bloom_free_partition(dabm_handle, &dup_conf);
+
+ return search_y_error_num + (err_rate > (double)BM_ERROR_RATE);
+}
+
+static void *bm_test_thread(void *arg)
+{
+ int bloom = (int)(long)arg;
+ if (bm_test(bloom) != 0)
+ {
+ return (void *)"error";
+ }
+ return (void *)"success";
+}
+
+static inline long long get_curtime_in_us(void)
+{
+ struct timespec curts;
+ clock_gettime(CLOCK_MONOTONIC, &curts);
+ return curts.tv_sec * 1000000 + curts.tv_nsec / 1000;
+}
+
+static inline void nssleep(long ns)
+{
+ struct timespec req = {0, ns};
+ nanosleep(&req, NULL);
+}
+
+static void *timer_thread(void *arg)
+{
+ while (timer_thread_run)
+ {
+ clock_gettime(CLOCK_REALTIME, (struct timespec *)&g_current_time_sp);
+ g_current_time_in_ms = g_current_time_sp.tv_sec * 1000 + g_current_time_sp.tv_nsec / 1000000;
+ nssleep(10);
+ }
+ return NULL;
+}
+
+int sapp_bloom_filter_test_run(int bloom)
+{
+ int ret = 0;
+ pthread_create(&timer_thread_id, NULL, timer_thread, NULL);
+
+ for (int i = 0; i < BM_TEST_MAX_THREAD; i++)
+ {
+ pthread_create(&bm_test_thread_id[i], NULL, bm_test_thread, (void *)(long)bloom);
+ }
+
+ void *thread_result;
+ for (int i = 0; i < BM_TEST_MAX_THREAD; i++)
+ {
+ pthread_join(bm_test_thread_id[i], &thread_result);
+ if (thread_result != "success")
+ {
+ printf("thread %d test failed\n", i);
+ ret = -1;
+ }
+ }
+ timer_thread_run = 0;
+ pthread_cancel(timer_thread_id);
+ pthread_join(timer_thread_id, NULL);
+
+ return ret;
+}
diff --git a/module_test/src/gtest_sapp_fun.h b/module_test/src/gtest_sapp_fun.h
index 7d4bab7..b40932c 100644
--- a/module_test/src/gtest_sapp_fun.h
+++ b/module_test/src/gtest_sapp_fun.h
@@ -569,6 +569,7 @@ void sapp_deal_proxy_kill_tcp_run(void);
/**************************** pkt_dump *************************************/
int test_pkt_dump_run(void);
+int sapp_bloom_filter_test_run(int);
void append_entry_list(const char *entryname);
int check_sapp_version(void);
diff --git a/module_test/src/gtest_sapp_support.cpp b/module_test/src/gtest_sapp_support.cpp
index c58dd81..f4f6a75 100644
--- a/module_test/src/gtest_sapp_support.cpp
+++ b/module_test/src/gtest_sapp_support.cpp
@@ -501,32 +501,20 @@ void control_platform_opt_run(void)
void control_stream_create_timestamp_ms_run(void)
{
- const char *argv[4];
-
- argv[0] = (char *)"./sapp ";
- argv[1] = (char *)"--dumpfile";
- argv[2] = (char *)"--dumpfile-speed=timestamp";
- argv[3] = NULL;
-
set_default_config();
set_project_list_conf_default();
update_config_file("etc/sapp.toml", "syn_mandatory", "0");
update_config_file("etc/sapp.toml", "timeout", "0");
update_config_file_by_lastline("etc/sapp.toml", "packet_io.polling","enabled", "1");
- update_plugin_inf("TCP_ALL", "stream_createtime_ms_tcpall_entry");
- /*
- ����һ��pcap��, ʹ�����dz���, ʹ��--dumpfile-speed=timestamp����,��֤ƽ̨��ʱ�䲻�˳�!
- �����pcap_onlineģʽ����lo��,�dz�����û������,���²�����ᱻ����.
- */
+ update_plugin_inf("IP", "iplayer_readjust_time_entry");
+ append_plugin_inf("TCP_ALL", "stream_createtime_ms_tcpall_entry");
+
set_pcap_dumpfile("tcp/tcp_simple.pcap");
ASSERT_EQ(file_md5_checksum("dumpfile", "df138740a6a22ca9c977052f21f7a470"),0);
- printf("test for stream_create_timestamp_ms, set --dumpfile-speed=timestamp please wait for a monment......\n");
-
- call_libsapp_devel_with_args(3, argv);
-
+ call_libsapp_devel_for_dumpfile_topspeed();
}
diff --git a/module_test/src/gtest_sapp_support_plug.cpp b/module_test/src/gtest_sapp_support_plug.cpp
index 19158b1..371d969 100644
--- a/module_test/src/gtest_sapp_support_plug.cpp
+++ b/module_test/src/gtest_sapp_support_plug.cpp
@@ -1033,8 +1033,6 @@ extern "C" char stream_createtime_ms_tcpall_entry(struct streaminfo *pstream,voi
int opt_len = sizeof(long long);
if(OP_STATE_PENDING == pstream->pktstate){
- sleep(1);
- usleep(123456);
ret = MESA_get_stream_opt(pstream, MSO_STREAM_CREATE_TIMESTAMP_MS, &create_time_ms_pending, &opt_len);
if(ret < 0){
printf("\033[1;31;40mMESA_get_stream_opt()->MSO_STREAM_CREATE_TIMESTAMP_MS error!\033[0m\n");
@@ -1069,20 +1067,13 @@ extern "C" char stream_createtime_ms_tcpall_entry(struct streaminfo *pstream,voi
return APP_STATE_DROPME;
}
- if(((pstream->ptcpdetail->createtime != create_time_ms_pending/1000)
- && (pstream->ptcpdetail->createtime != create_time_ms_pending/1000 - 1))/* 防止在执行的瞬间跨过了一秒 */
- || pstream->ptcpdetail->createtime != create_time_s){
- printf("\033[1;31;40mMESA_get_stream_opt()->MSO_STREAM_CREATE_TIMESTAMP_MS is:%llu, but current time is:%llu\033[0m\n", create_time_ms_pending, pstream->ptcpdetail->createtime);
- sendto_test_result(GTEST_SAPP_ERR);
- return APP_STATE_DROPME;
- }
ret = MESA_get_stream_opt(pstream, MSO_STREAM_LASTUPDATE_TIMESTAMP_MS, &lastm_time_ms2, &opt_len);
if(ret < 0){
printf("\033[1;31;40mMESA_get_stream_opt()->MSO_STREAM_LASTUPDATE_TIMESTAMP_MS error!\033[0m\n");
sendto_test_result(GTEST_SAPP_ERR);
return APP_STATE_DROPME;
}
- if(lastm_time_ms2 == lastm_time_ms1){
+ if(lastm_time_ms2 <= lastm_time_ms1){
printf("\033[1;31;40mMESA_get_stream_opt()->MSO_STREAM_LASTUPDATE_TIMESTAMP_MS pending:%llu diff with close:%llu\033[0m\n", lastm_time_ms1, lastm_time_ms2);
sendto_test_result(GTEST_SAPP_ERR);
return APP_STATE_DROPME;
diff --git a/src/common/sapp_mem.c b/src/common/sapp_mem.c
index e921de0..aeb6b36 100644
--- a/src/common/sapp_mem.c
+++ b/src/common/sapp_mem.c
@@ -49,9 +49,10 @@ static const mem_stat_name_tuple_t mem_stat_name_tuple[] =
{SAPP_MEM_DYN_PADDR , "layer_addr"},
{SAPP_MEM_DYN_DETAIN_PKT , "detain_pkt"},
{SAPP_MEM_DYN_SID_LIST , "segment_id_list"},
+ {SAPP_MEM_DYN_BLOOM_FILTER , "bloom_filter"},
};
-void *sapp_mem_malloc(sapp_mem_type_t type, int thread_seq, int size)
+void *sapp_mem_malloc(sapp_mem_type_t type, int thread_seq, unsigned int size)
{
char *ptr;
sapp_private_mem_t *mhdr;
@@ -82,7 +83,7 @@ void *sapp_mem_malloc(sapp_mem_type_t type, int thread_seq, int size)
}
-void *sapp_mem_calloc(sapp_mem_type_t type, int thread_seq, int size)
+void *sapp_mem_calloc(sapp_mem_type_t type, int thread_seq, unsigned int size)
{
char *ptr;
sapp_private_mem_t *mhdr;
@@ -137,7 +138,7 @@ void sapp_mem_free(sapp_mem_type_t type, int thread_seq, void *data)
free((void *)mhdr);
}
-void *sapp_mem_realloc(sapp_mem_type_t type, int thread_seq, void *old_ptr, int size)
+void *sapp_mem_realloc(sapp_mem_type_t type, int thread_seq, void *old_ptr, unsigned int size)
{
void *new_ptr;
sapp_private_mem_t *mhdr;
@@ -258,7 +259,7 @@ void sapp_mem_stat_output(void)
}
for(stat_index = __SAPP_FIX_DYN_SEPARATOR + 1 ; stat_index < __SAPP_MEM_TYPE_MAX; stat_index++){
- if(dyn_realtime_stat.mem_used_block[stat_index] > 0){
+ if(dyn_realtime_stat.mem_used_block[stat_index] != 0){
fprintf(fp, "%7s %18s %18lld %18lld %18s\n",
"",
mem_stat_name_tuple[stat_index].mem_type_string,
@@ -287,6 +288,37 @@ void sapp_mem_stat_output(void)
fclose(fp);
}
+char bloomfilter_mem_stat_polling_entry(struct streaminfo *nouse1, void **nouse2, int thread_seq, void *nouse3)
+{
+ static time_t last_get_time[SAPP_MAX_THREADS] = {};
+
+ if(ABBR_CURRENT_TIME <= last_get_time[thread_seq])
+ {
+ return POLLING_STATE_IDLE;
+ }
+
+ long long blocks = 0, bytes = 0;
+ bloomfilter_get_mem_stat(thread_seq, &blocks, &bytes);
+
+ sapp_global_val->mthread_volatile[thread_seq]->mem_used_stat.mem_used_block[SAPP_MEM_DYN_BLOOM_FILTER] = blocks;
+ sapp_global_val->mthread_volatile[thread_seq]->mem_used_stat.mem_used_bytes[SAPP_MEM_DYN_BLOOM_FILTER] = bytes;
+
+ last_get_time[thread_seq] = ABBR_CURRENT_TIME;
+ return POLLING_STATE_WORK;
+}
+
+int sapp_mem_init(void)
+{
+ if(sapp_global_val->config.packet_io.dup_pkt_para.kickout_udp_stream_enabled
+ || sapp_global_val->config.packet_io.dup_pkt_para.dup_pkt_distinguish_ipv4_tcp
+ || sapp_global_val->config.packet_io.dup_pkt_para.dup_pkt_distinguish_ipv4_udp
+ || sapp_global_val->config.packet_io.dup_pkt_para.dup_pkt_distinguish_all_inject)
+ {
+ stream_register_fun(FUN_TYPE_POLLING, (char (*)(void))bloomfilter_mem_stat_polling_entry, 0);
+ }
+ return 0;
+}
+
#ifdef __cplusplus
}
#endif
diff --git a/src/config/config_parse.cpp b/src/config/config_parse.cpp
index 6b72ea4..a6d1277 100644
--- a/src/config/config_parse.cpp
+++ b/src/config/config_parse.cpp
@@ -1319,6 +1319,22 @@ static int config_sanity_check(void)
return -1;
}
+ if (pconfig->packet_io.dup_pkt_para.kickout_udp_stream_enabled
+ || pconfig->packet_io.dup_pkt_para.dup_pkt_distinguish_ipv4_tcp
+ || pconfig->packet_io.dup_pkt_para.dup_pkt_distinguish_ipv4_udp
+ || pconfig->packet_io.dup_pkt_para.dup_pkt_distinguish_all_inject)
+ {
+ if (pconfig->packet_io.dup_pkt_para.bloom_partition_num <= 0)
+ {
+ sapp_log(RLOG_LV_FATAL, ~0, ~0, "bloom_partition_num is:%d, must be greater than 0 !", pconfig->packet_io.dup_pkt_para.bloom_partition_num);
+ return -1;
+ }
+
+ if(pconfig->packet_io.dup_pkt_para.transition_time_ms > pconfig->packet_io.dup_pkt_para.bloom_timeout_ms/2){
+ sapp_log(RLOG_LV_INFO, ~0, ~0, "bloom_transition_time is greater than bloom_timeout/2\n");
+ }
+ }
+
/******************************* CPU ********************************/
cur_cpu_num = get_nprocs();
if(cur_cpu_num < pconfig->cpu.worker_threads){
@@ -1735,13 +1751,18 @@ int sapp_parse_config(void)
tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.traffic.inject", (char *)"inject_all_enabled", &pconfig->packet_io.dup_pkt_para.dup_pkt_distinguish_all_inject, 0);
tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.parameters", (char *)"bloom_capacity", &pconfig->packet_io.dup_pkt_para.bloom_capacity, 1000000);
- tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.parameters", (char *)"bloom_timeout", &pconfig->packet_io.dup_pkt_para.bloom_timeout, 60);
+ int tmp_dm_timeout;
+ tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.parameters", (char *)"bloom_timeout", &tmp_dm_timeout, 60 * 1000);
+ pconfig->packet_io.dup_pkt_para.bloom_timeout_ms = (long)tmp_dm_timeout * 1000;
+ tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.parameters", (char *)"bloom_transition_time", &tmp_dm_timeout, 3);
+ pconfig->packet_io.dup_pkt_para.transition_time_ms = (long)tmp_dm_timeout * 1000;
+
tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.parameters", (char *)"bloom_slice_num", &pconfig->packet_io.dup_pkt_para.bloom_slice_num, 3);
tomlc99_wrap_load_string_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.parameters", (char *)"bloom_error_rate", str_tmp, sizeof(str_tmp), "0.000001");
tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.parameters", (char *)"bloom_library", (int *)&pconfig->packet_io.dup_pkt_para.bloom_library, 1);
tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.parameters", (char *)"first_packets", (int *)&pconfig->packet_io.dup_pkt_para.first_packets, 3);
+ tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.parameters", (char *)"bloom_partition_num", (int *)&pconfig->packet_io.dup_pkt_para.bloom_partition_num, 1);
pconfig->packet_io.dup_pkt_para.bloom_error_rate = strtod(str_tmp, NULL);
-
/******************************* packet_io.under_ddos ******************************/
tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"packet_io.under_ddos", (char *)"stream_bypass_enabled", &pconfig->packet_io.under_ddos_config.enabled, 0); //��ǰ����, Ĭ�ϲ�����
tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"packet_io.under_ddos", (char *)"get_cpu_usage_interval", &pconfig->packet_io.under_ddos_config.get_cpu_usage_interval, 500);
diff --git a/src/dealpkt/duplicate_pkt_distinguish.c b/src/dealpkt/duplicate_pkt_distinguish.c
index 1a904b6..fa7a64c 100644
--- a/src/dealpkt/duplicate_pkt_distinguish.c
+++ b/src/dealpkt/duplicate_pkt_distinguish.c
@@ -12,35 +12,41 @@ extern "C" {
识别因路由策略导致的重复流量, 首次收到数据包加入bloom filter,
每个流的前N个包扫描bloom filter, 重复的数据包直接转发即可.
- ip层的ttl, hop不能算为key, 因为经过了几跳路由器又回绕了, ttl肯定会变化, 而上层的数据包确实是重复的.
+ ip层的ttl, hop不能算为key, 因为经过了几跳路由器又回绕了, ttl肯定会变�?, 而上层的数据包确实是重复�?.
*/
/*
2021-05-18 lijia close ipv6 protocol:
重复流量识别bloom filter句柄, 根据流量方向和协议的不同, 功能分为三类,
- IPv6因为没有Ipid字段, 无法区别真正的应用层重传和重复流量, 开启了会导致断网或者CT, 所以不支持!!!
+ IPv6因为没有Ipid字段, 无法区别真正的应用层重传和重复流�?, 开启了会导致断网或者CT, 所以不支持!!!
- 对于IPv6的两种情况:
- 1)如果开启了代理策略, 重传包被识别成了重复包的话, sapp就直接PASS了, 实际那个包是真实通信双方发的, 没有经过tfe处理, 可能就断网了.
- 2)如果开启了firewall的drop策略, 第一次drop成功了, 但应用层会重传, 重传包被识别成了重复包的话, sapp就直接PASS了, 导致CT.
+ 对于IPv6的两种情�?:
+ 1)如果开启了代理策略, 重传包被识别成了重复包的�?, sapp就直接PASS�?, 实际那个包是真实通信双方发的, 没有经过tfe处理, 可能就断网了.
+ 2)如果开启了firewall的drop策略, 第一次drop成功�?, 但应用层会重�?, 重传包被识别成了重复包的�?, sapp就直接PASS�?, 导致CT.
*/
+/* 24.04, split bloomfilter into multiple partition to avoid blocking caused by memset large memory */
+struct bloom_partition
+{
+ void **bm_handle;
+ int bm_partition_num;
+};
-static inline void *bloom_new(const sapp_dup_pkt_t *dup_conf, struct timeval now)
+static inline void *_bloom_new(const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms)
{
switch(dup_conf->bloom_library)
{
case BLOOM_LIBRARY_APBLOOM:
- return AP_bloom_new(now, dup_conf->bloom_error_rate, dup_conf->bloom_capacity, dup_conf->bloom_timeout*1000, dup_conf->bloom_slice_num);
+ return AP_bloom_new(now, dup_conf->bloom_error_rate, dup_conf->bloom_capacity, dup_conf->bloom_timeout_ms, dup_conf->bloom_slice_num);
case BLOOM_LIBRARY_DABLOOM:
- return expiry_dablooms_init(dup_conf->bloom_capacity, dup_conf->bloom_error_rate, now.tv_sec, dup_conf->bloom_timeout);
+ return expiry_dablooms_init(dup_conf->bloom_capacity, dup_conf->bloom_error_rate, now_ms, dup_conf->bloom_timeout_ms, dup_conf->transition_time_ms);
default:
return NULL;
}
}
-static inline void bloom_free(void *bloom_filter, const sapp_dup_pkt_t *dup_conf)
+static inline void _bloom_free(void *bloom_filter, const sapp_dup_pkt_t *dup_conf)
{
switch(dup_conf->bloom_library)
{
@@ -54,7 +60,56 @@ static inline void bloom_free(void *bloom_filter, const sapp_dup_pkt_t *dup_conf
return;
}
-static inline int bloom_check(void *bloom_filter, const char *key, int key_len, const sapp_dup_pkt_t *dup_conf, struct timeval now)
+void bloom_free_partition(void *bloom_filter, const sapp_dup_pkt_t *dup_conf)
+{
+ struct bloom_partition *bm_partition = (struct bloom_partition *)bloom_filter;
+ int i;
+ for(i = 0; i < bm_partition->bm_partition_num; i++){
+ if(bm_partition->bm_handle[i]){
+ _bloom_free(bm_partition->bm_handle[i], dup_conf);
+ }
+ }
+ free(bm_partition);
+ return;
+}
+
+void *bloom_new_partition(const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms)
+{
+ int i;
+ struct bloom_partition *bm_partition = (struct bloom_partition *)calloc(1, sizeof(struct bloom_partition));
+ bm_partition->bm_handle = (void **)calloc(dup_conf->bloom_partition_num, sizeof(void *));
+
+ long base_timeout = dup_conf->bloom_timeout_ms;
+ int polarity = 1;
+ sapp_dup_pkt_t local_dup_conf = {};
+ memcpy(&local_dup_conf, dup_conf, sizeof(sapp_dup_pkt_t));
+ srand(time(NULL));
+ for(i = 0; i < dup_conf->bloom_partition_num; i++){
+ local_dup_conf.bloom_timeout_ms = base_timeout + (random() % 1000) * polarity; // spread out expire time
+ polarity *= -1;
+ if(dup_conf->bloom_timeout_ms < 0){
+ local_dup_conf.bloom_timeout_ms = base_timeout;
+ }
+ bm_partition->bm_handle[i] = _bloom_new(&local_dup_conf, now, now_ms);
+ if(NULL == bm_partition->bm_handle[i]){
+ break;
+ }
+ }
+ if(i != dup_conf->bloom_partition_num){
+ for(i = 0; i < dup_conf->bloom_partition_num; i++){
+ if(bm_partition->bm_handle[i]){
+ _bloom_free(bm_partition->bm_handle[i], dup_conf);
+ }
+ }
+ free(bm_partition);
+ return NULL;
+ }
+ bm_partition->bm_partition_num = dup_conf->bloom_partition_num;
+ return bm_partition;
+}
+
+
+static inline int _bloom_check(void *bloom_filter, const char *key, int key_len, const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms)
{
switch(dup_conf->bloom_library)
{
@@ -63,13 +118,29 @@ static inline int bloom_check(void *bloom_filter, const char *key, int key_len,
now, (const char *)key, key_len);
case BLOOM_LIBRARY_DABLOOM:
return expiry_dablooms_search((struct expiry_dablooms_handle *)bloom_filter,
- (const char *)key, key_len, (time_t)now.tv_sec);
+ (const char *)key, key_len, now_ms);
default:
return 0;
}
}
-static inline void bloom_add(void *bloom_filter, const char *key, int key_len, const sapp_dup_pkt_t *dup_conf, struct timeval now)
+static inline unsigned int bloom_parttion_kiss_hash(const unsigned char *key, int key_len)
+{
+ unsigned int hash = 127;
+ for(int i = 0; i < key_len; i++){
+ hash += (unsigned int )key[i];
+ }
+ return (hash & 0x7ffffff);
+}
+
+int bloom_check_partition(void *bloom_filter, const char *key, int key_len, const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms)
+{
+ struct bloom_partition *bm_partition = (struct bloom_partition *)bloom_filter;
+ int index = bloom_parttion_kiss_hash((unsigned char *)key, key_len) % bm_partition->bm_partition_num;
+ return _bloom_check(bm_partition->bm_handle[index], key, key_len, dup_conf, now, now_ms);
+}
+
+static inline void _bloom_add(void *bloom_filter, const char *key, int key_len, const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms)
{
switch(dup_conf->bloom_library)
{
@@ -78,7 +149,7 @@ static inline void bloom_add(void *bloom_filter, const char *key, int key_len, c
now, (const char *)key, key_len);
case BLOOM_LIBRARY_DABLOOM:
expiry_dablooms_add((struct expiry_dablooms_handle *)bloom_filter,
- (const char *)key, key_len, (time_t)now.tv_sec);
+ (const char *)key, key_len, now_ms);
break;
default:
break;
@@ -86,6 +157,52 @@ static inline void bloom_add(void *bloom_filter, const char *key, int key_len, c
return;
}
+void bloom_add_partition(void *bloom_filter, const char *key, int key_len, const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms)
+{
+ struct bloom_partition *bm_partition = (struct bloom_partition *)bloom_filter;
+ int index = bloom_parttion_kiss_hash((unsigned char *)key, key_len) % bm_partition->bm_partition_num;
+ return _bloom_add(bm_partition->bm_handle[index], key, key_len, dup_conf, now, now_ms);
+}
+
+int bloomfilter_get_mem_stat(int tseq, long long *mem_blocks, long long *mem_bytes)
+{
+ struct bloom_partition *bm_partition = (struct bloom_partition *)sapp_global_val->mthread_volatile[tseq]->dup_pkt_distinguish_handle;
+
+ if(NULL == bm_partition){
+ *mem_blocks = 0;
+ *mem_bytes = 0;
+ return 0;
+ }
+
+ switch (sapp_global_val->config.packet_io.dup_pkt_para.bloom_library)
+ {
+ case BLOOM_LIBRARY_APBLOOM:
+ // todo,
+ *mem_blocks = 0;
+ *mem_bytes = 0;
+ return -1;
+ break;
+ case BLOOM_LIBRARY_DABLOOM:
+ {
+ long long tmp_blocks = 0;
+ long long tmp_bytes = 0;
+ *mem_blocks = 0;
+ *mem_bytes = 0;
+ for (int i = 0; i < bm_partition->bm_partition_num; i++)
+ {
+ expiry_dablooms_get_mem_stat(bm_partition->bm_handle[i], &tmp_blocks, &tmp_bytes);
+ *mem_blocks += tmp_blocks;
+ *mem_bytes += tmp_bytes;
+ }
+ }
+ break;
+ default:
+ break;
+ }
+
+ return 0;
+}
+
int sapp_dup_stream_search(struct streaminfo *a_stream)
{
void *key=NULL;
@@ -102,7 +219,7 @@ int sapp_dup_stream_search(struct streaminfo *a_stream)
}
struct timeval now={g_CurrentTime, 0};
- return bloom_check(sapp_global_val->mthread_volatile[a_stream->threadnum]->dup_pkt_distinguish_handle, key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now);
+ return bloom_check_partition(sapp_global_val->mthread_volatile[a_stream->threadnum]->dup_pkt_distinguish_handle, key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now, g_current_time_ms);
}
@@ -121,7 +238,7 @@ void sapp_dup_stream_add(struct streaminfo *a_stream)
key_len=sizeof(struct stream_tuple4_v6);
}
struct timeval now={g_CurrentTime, 0};
- bloom_add(sapp_global_val->mthread_volatile[a_stream->threadnum]->dup_pkt_distinguish_handle, key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now);
+ bloom_add_partition(sapp_global_val->mthread_volatile[a_stream->threadnum]->dup_pkt_distinguish_handle, key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now, g_current_time_ms);
}
static inline int sapp_set_dup_pkt_key(enum addr_type_t addr_type, enum stream_type_t stream_type, const void *iphdr,
@@ -141,8 +258,8 @@ static inline int sapp_set_dup_pkt_key(enum addr_type_t addr_type, enum stream_t
if (stream_type == STREAM_TYPE_UDP)
{
const struct mesa_udp_hdr *uhdr = (const struct mesa_udp_hdr *)l4_hdr;
- l4->tcp_seq = 0x5A5A5A5A; /* udp没有, 但设成0可能对某些hash函数不友好, 此处设个固定值 */
- l4->tcp_ack = 0xA5A5A5A5; /* udp没有, 但设成0可能对某些hash函数不友好, 此处设个固定值 */
+ l4->tcp_seq = 0x5A5A5A5A; /* udp没有, 但设�?0可能对某些hash函数不友�?, 此处设个固定�? */
+ l4->tcp_ack = 0xA5A5A5A5; /* udp没有, 但设�?0可能对某些hash函数不友�?, 此处设个固定�? */
l4->sport = uhdr->uh_sport;
l4->dport = uhdr->uh_dport;
l4->checksum = uhdr->uh_sum;
@@ -182,12 +299,12 @@ int sapp_dup_pkt_identify(int tid, struct streaminfo_private *pstream_pr,
struct timeval now={g_CurrentTime, 0};
size_t key_len = sapp_set_dup_pkt_key((enum addr_type_t)pstream_pr->stream_public.addr.addrtype, (enum stream_type_t)pstream_pr->stream_public.type, ip_hdr, (void *)l4_hdr, &dup_bloom_key);
- is_dup_pkt= bloom_check(sapp_global_val->mthread_volatile[tid]->dup_pkt_distinguish_handle, (const char*)&dup_bloom_key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now);
+ is_dup_pkt= bloom_check_partition(sapp_global_val->mthread_volatile[tid]->dup_pkt_distinguish_handle, (const char*)&dup_bloom_key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now, g_current_time_ms);
if(is_dup_pkt != 0){
pstream_pr->has_duplicate_pkt = 1;
}else{
if(need_add_bloom_filter){
- bloom_add(sapp_global_val->mthread_volatile[tid]->dup_pkt_distinguish_handle, (const char*)&dup_bloom_key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now);
+ bloom_add_partition(sapp_global_val->mthread_volatile[tid]->dup_pkt_distinguish_handle, (const char*)&dup_bloom_key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now, g_current_time_ms);
}
}
return is_dup_pkt;
@@ -200,7 +317,7 @@ void sapp_dup_pkt_mark_l4(struct streaminfo *a_stream, const void *ip_hdr, const
int ret = 0;
struct timeval now={g_CurrentTime, 0};
key_len = sapp_set_dup_pkt_key((enum addr_type_t)a_stream->addr.addrtype, (enum stream_type_t)a_stream->type, ip_hdr, l4_hdr, &dup_bloom_key);
- bloom_add(sapp_global_val->mthread_volatile[tid]->dup_pkt_distinguish_handle, (const char*)&dup_bloom_key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now);
+ bloom_add_partition(sapp_global_val->mthread_volatile[tid]->dup_pkt_distinguish_handle, (const char*)&dup_bloom_key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now, g_current_time_ms);
return;
}
@@ -216,12 +333,12 @@ int sapp_dup_pkt_init(void)
return 0;
}
- /* 流量入口有三个开关, 识别句柄只有一个,
- 拆分句柄多耗费内存, 且判断时每个包需要扫描多个句柄,
+ /* 流量入口有三个开�?, 识别句柄只有一�?,
+ 拆分句柄多耗费内存, 且判断时每个包需要扫描多个句�?,
可能好处只是准确率高了一点点而已 */
struct timeval now={time(NULL), 0};
for(i = 0; i < sapp_global_val->config.cpu.worker_threads; i++){
- sapp_global_val->mthread_volatile[i]->dup_pkt_distinguish_handle = bloom_new(p_dup_conf, now);
+ sapp_global_val->mthread_volatile[i]->dup_pkt_distinguish_handle = bloom_new_partition(p_dup_conf, now, g_current_time_ms);
if(NULL == sapp_global_val->mthread_volatile[i]->dup_pkt_distinguish_handle){
sapp_log(RLOG_LV_FATAL, 1, 1, "dup_pkt_distinguish init error!");
return -1;
@@ -238,7 +355,7 @@ void sapp_dup_pkt_destroy(void)
const sapp_dup_pkt_t *p_dup_conf = &sapp_global_val->config.packet_io.dup_pkt_para;
for(i = 0; i < sapp_global_val->config.cpu.worker_threads; i++){
if(sapp_global_val->mthread_volatile[i]->dup_pkt_distinguish_handle){
- bloom_free(sapp_global_val->mthread_volatile[i]->dup_pkt_distinguish_handle,p_dup_conf);
+ bloom_free_partition(sapp_global_val->mthread_volatile[i]->dup_pkt_distinguish_handle,p_dup_conf);
}
}
diff --git a/src/sapp_dev/sapp_init.c b/src/sapp_dev/sapp_init.c
index f6ef015..5c1d247 100644
--- a/src/sapp_dev/sapp_init.c
+++ b/src/sapp_dev/sapp_init.c
@@ -303,6 +303,8 @@ int MESA_platform_init(int argc, char *argv[])
sapp_set_current_state(SAPP_STATE_PKT_IO_INITED);
+ sapp_mem_init();
+
sapp_runtime_log(RLOG_LV_DEBUG, "sapp platform init success");
return 0;
diff --git a/src/support/ap_bloom/src/ap_bloom.c b/src/support/ap_bloom/src/ap_bloom.c
index 79cf47d..4064e3e 100644
--- a/src/support/ap_bloom/src/ap_bloom.c
+++ b/src/support/ap_bloom/src/ap_bloom.c
@@ -158,12 +158,14 @@ struct ap_state
int consecutive_matches;
int visited_slices;
int hash_num;
+ int counter;
UT_array slice_time_events;
};
void ap_state_init(struct ap_state *state, int hash_num)
{
state->consecutive_matches=0;
state->visited_slices=0;
+ state->counter=0;
state->hash_num=hash_num;
utarray_init(&state->slice_time_events, &slice_event_icd);
utarray_reserve(&state->slice_time_events, hash_num*2);
@@ -172,6 +174,7 @@ void ap_state_clear(struct ap_state *state)
{
state->consecutive_matches=0;
state->visited_slices=0;
+ state->counter=0;
utarray_clear(&state->slice_time_events);
}
void ap_state_done(struct ap_state *state)
@@ -188,6 +191,7 @@ int ap_state_is_match(struct ap_state *state)
{
return 1;
}
+ if(state->counter >= state->hash_num)return 1;
utarray_sort(&state->slice_time_events, slice_event_cmp);
while((ev=(struct ap_slice_event*)utarray_next(&state->slice_time_events, ev)))
{
@@ -221,6 +225,10 @@ static void ap_slice_chain_check_hash(const struct ap_slice *head, struct double
ev.hash_index = slice->hash_index;
utarray_push_back(&state->slice_time_events, &ev);
found=1;
+ if(timercmp(&slice->last_insert, &slice->first_insert, >))
+ {
+ state->counter++;
+ }
}
state->visited_slices++;
}
diff --git a/src/support/dablooms/CMakeLists.txt b/src/support/dablooms/CMakeLists.txt
index f7e8ecf..58369a1 100644
--- a/src/support/dablooms/CMakeLists.txt
+++ b/src/support/dablooms/CMakeLists.txt
@@ -1,4 +1,4 @@
cmake_minimum_required(VERSION 2.8...3.10)
add_definitions(-fPIC)
-add_library(libdabloom STATIC src/dablooms.c src/murmur.c) \ No newline at end of file
+add_library(libdabloom STATIC src/dablooms.c src/murmur.c src/mem_util.c) \ No newline at end of file
diff --git a/src/support/dablooms/src/dablooms.c b/src/support/dablooms/src/dablooms.c
index 9f52a26..fbdcd31 100755
--- a/src/support/dablooms/src/dablooms.c
+++ b/src/support/dablooms/src/dablooms.c
@@ -12,36 +12,39 @@
#include <unistd.h>
#include <errno.h>
#include <time.h>
+#include <assert.h>
#include "murmur.h"
#include "dablooms.h"
+#include "mem_util.h"
-#define DABLOOMS_VERSION "0.9.1"
+#define DABLOOMS_VERSION "1.0.1"
#define ERROR_TIGHTENING_RATIO 0.5
#define SALT_CONSTANT 0x97c29b3a
-#define ALLOC(type, number) ((type *)calloc(sizeof(type), number))
-#define FREE(p) {free(p);p=NULL;}
+#define ALLOC(type, number, mstat) ((type *)dabm_calloc(number, sizeof(type), mstat))
+#define FREE(p, mstat) dabm_free(p, mstat)
+#define REALLOC(p, size, mstat) dabm_realloc(p, size, mstat)
const char *dablooms_version(void)
{
return DABLOOMS_VERSION;
}
-void free_bitmap(bitmap_t *bitmap)
+static void free_bitmap(bitmap_t *bitmap, struct dabm_mem_stat *mstat)
{
#if 0
if ((munmap(bitmap->array, bitmap->bytes)) < 0) {
perror("Error, unmapping memory");
}
#else
- FREE(bitmap->array);
+ FREE(bitmap->array, mstat);
#endif
- FREE(bitmap);
+ FREE(bitmap, mstat);
}
-bitmap_t *bitmap_resize(bitmap_t *bitmap, size_t old_size, size_t new_size)
+static bitmap_t *bitmap_resize(bitmap_t *bitmap, size_t old_size, size_t new_size, struct dabm_mem_stat *mstat)
{
#if 0
@@ -74,18 +77,20 @@ bitmap_t *bitmap_resize(bitmap_t *bitmap, size_t old_size, size_t new_size)
#else
if (bitmap->array != NULL)
{
- bitmap->array = (char *)realloc(bitmap->array, new_size);
- if (bitmap->array == NULL)
- {
- perror("Error resizing memory");
- free_bitmap(bitmap);
- return NULL;
- }
+ bitmap->array = (char *)REALLOC(bitmap->array, new_size, mstat);
+ assert(bitmap->array);
+ // if (bitmap->array == NULL)
+ // {
+ // perror("Error resizing memory");
+ // free_bitmap(bitmap, mstat);
+ // return NULL;
+ // }
memset(bitmap->array + old_size, 0, new_size - old_size);
- }
+ //printf("bitmap_resize(), %p, new:%d, old:%d\n", bitmap,(int)new_size, (int)old_size);
+ }
else
{
- bitmap->array = ALLOC(char, new_size);
+ bitmap->array = ALLOC(char, new_size, mstat);
}
#endif
bitmap->bytes = new_size;
@@ -94,22 +99,22 @@ bitmap_t *bitmap_resize(bitmap_t *bitmap, size_t old_size, size_t new_size)
/* Create a new bitmap, not full featured, simple to give
* us a means of interacting with the 4 bit counters */
-bitmap_t *new_bitmap(size_t bytes)
+static bitmap_t *new_bitmap(size_t bytes, struct dabm_mem_stat *mstat)
{
bitmap_t *bitmap;
- bitmap = ALLOC(bitmap_t, 1);
+ bitmap = ALLOC(bitmap_t, 1, mstat);
bitmap->bytes = bytes;
- if ((bitmap = bitmap_resize(bitmap, 0, bytes)) == NULL) {
+ if ((bitmap = bitmap_resize(bitmap, 0, bytes, mstat)) == NULL) {
return NULL;
}
return bitmap;
}
-int bitmap_increment(bitmap_t *bitmap, unsigned int index, long offset)
+static inline int bitmap_increment(bitmap_t *bitmap, unsigned int index, long offset)
{
long access = index / 2 + offset;
uint8_t temp;
@@ -158,7 +163,7 @@ int bitmap_decrement(bitmap_t *bitmap, unsigned int index, long offset)
}
/* decrements the four bit counter */
-int bitmap_check(bitmap_t *bitmap, unsigned int index, long offset)
+static inline int bitmap_check(const bitmap_t *bitmap, unsigned int index, long offset)
{
long access = index / 2 + offset;
if (index % 2 != 0 ) {
@@ -168,19 +173,19 @@ int bitmap_check(bitmap_t *bitmap, unsigned int index, long offset)
}
}
-int bitmap_flush(bitmap_t *bitmap)
-{
-#if 0
- if ((msync(bitmap->array, bitmap->bytes, MS_SYNC) < 0)) {
- perror("Error, flushing bitmap to disk");
- return -1;
- } else {
- return 0;
- }
-#else
- return 0;
-#endif
-}
+// int bitmap_flush(bitmap_t *bitmap)
+// {
+// #if 0
+// if ((msync(bitmap->array, bitmap->bytes, MS_SYNC) < 0)) {
+// perror("Error, flushing bitmap to disk");
+// return -1;
+// } else {
+// return 0;
+// }
+// #else
+// return 0;
+// #endif
+// }
/*
* Perform the actual hashing for `key`
@@ -191,36 +196,34 @@ int bitmap_flush(bitmap_t *bitmap)
* See paper by Kirsch, Mitzenmacher [2006]
* http://www.eecs.harvard.edu/~michaelm/postscripts/rsa2008.pdf
*/
-void hash_func(counting_bloom_t *bloom, const char *key, size_t key_len, uint32_t *hashes)
+static inline void hash_func(uint32_t *hashes, const uint32_t checksum[4], size_t nfuncs, uint32_t counts_per_func)
{
- uint32_t checksum[4];
-
- MurmurHash3_x64_128(key, key_len, SALT_CONSTANT, checksum);
+ // uint32_t checksum[4];
+ // MurmurHash3_x64_128(key, key_len, SALT_CONSTANT, checksum);
uint32_t h1 = checksum[0];
uint32_t h2 = checksum[1];
-
- for (size_t i = 0; i < bloom->nfuncs; i++) {
- hashes[i] = (h1 + i * h2) % bloom->counts_per_func;
+ for (size_t i = 0; i < nfuncs; i++) {
+ hashes[i] = (h1 + i * h2) % counts_per_func;
}
}
-int free_counting_bloom(counting_bloom_t *bloom)
-{
- if (bloom != NULL) {
- FREE(bloom->hashes);
- bloom->hashes = NULL;
- free_bitmap(bloom->bitmap);
- FREE(bloom);
- bloom = NULL;
- }
- return 0;
-}
+// int free_counting_bloom(counting_bloom_t *bloom)
+// {
+// if (bloom != NULL) {
+// FREE(bloom->hashes);
+// bloom->hashes = NULL;
+// free_bitmap(bloom->bitmap);
+// FREE(bloom);
+// bloom = NULL;
+// }
+// return 0;
+// }
-counting_bloom_t *counting_bloom_init(unsigned int capacity, double error_rate, long offset)
+counting_bloom_t *counting_bloom_init(unsigned int capacity, double error_rate, long offset, struct dabm_mem_stat *mstat)
{
counting_bloom_t *bloom;
- bloom = ALLOC(counting_bloom_t, 1);
+ bloom = ALLOC(counting_bloom_t, 1, mstat);
bloom->bitmap = NULL;
bloom->capacity = capacity;
bloom->error_rate = error_rate;
@@ -230,30 +233,32 @@ counting_bloom_t *counting_bloom_init(unsigned int capacity, double error_rate,
bloom->size = bloom->nfuncs * bloom->counts_per_func;
/* rounding-up integer divide by 2 of bloom->size */
bloom->num_bytes = ((bloom->size + 1) / 2) + sizeof(counting_bloom_header_t);
- bloom->hashes = ALLOC(uint32_t, bloom->nfuncs);
+ bloom->hashes = ALLOC(uint32_t, bloom->nfuncs, mstat);
return bloom;
}
-counting_bloom_t *new_counting_bloom(unsigned int capacity, double error_rate)
+static counting_bloom_t *new_counting_bloom(unsigned int capacity, double error_rate, struct dabm_mem_stat *mstat)
{
counting_bloom_t *cur_bloom;
- cur_bloom = counting_bloom_init(capacity, error_rate, 0);
- cur_bloom->bitmap = new_bitmap(cur_bloom->num_bytes);
+ cur_bloom = counting_bloom_init(capacity, error_rate, 0, mstat);
+ cur_bloom->bitmap = new_bitmap(cur_bloom->num_bytes, mstat);
cur_bloom->header = (counting_bloom_header_t *)(cur_bloom->bitmap->array);
return cur_bloom;
}
-int counting_bloom_add(counting_bloom_t *bloom, const char *s, size_t len)
+static int counting_bloom_add(counting_bloom_t *bloom, const uint32_t checksum[4])
{
unsigned int index, i, offset;
unsigned int *hashes = bloom->hashes;
+ size_t nfuncs = bloom->nfuncs;
+ uint32_t counts_per_func = bloom->counts_per_func;
+
+ hash_func(hashes, checksum, nfuncs, counts_per_func);
- hash_func(bloom, s, len, hashes);
-
- for (i = 0; i < bloom->nfuncs; i++) {
- offset = i * bloom->counts_per_func;
+ for (i = 0; i < nfuncs; i++) {
+ offset = i * counts_per_func;
index = hashes[i] + offset;
bitmap_increment(bloom->bitmap, index, bloom->offset);
}
@@ -262,32 +267,34 @@ int counting_bloom_add(counting_bloom_t *bloom, const char *s, size_t len)
return 0;
}
-int counting_bloom_remove(counting_bloom_t *bloom, const char *s, size_t len)
-{
- unsigned int index, i, offset;
- unsigned int *hashes = bloom->hashes;
+// int counting_bloom_remove(counting_bloom_t *bloom, const char *s, size_t len)
+// {
+// unsigned int index, i, offset;
+// unsigned int *hashes = bloom->hashes;
- hash_func(bloom, s, len, hashes);
+// hash_func(bloom, s, len, hashes);
- for (i = 0; i < bloom->nfuncs; i++) {
- offset = i * bloom->counts_per_func;
- index = hashes[i] + offset;
- bitmap_decrement(bloom->bitmap, index, bloom->offset);
- }
- bloom->header->count--;
+// for (i = 0; i < bloom->nfuncs; i++) {
+// offset = i * bloom->counts_per_func;
+// index = hashes[i] + offset;
+// bitmap_decrement(bloom->bitmap, index, bloom->offset);
+// }
+// bloom->header->count--;
- return 0;
-}
+// return 0;
+// }
-int counting_bloom_check(counting_bloom_t *bloom, const char *s, size_t len)
+static int counting_bloom_check(const counting_bloom_t *bloom, const uint32_t checksum[4])
{
unsigned int index, i, offset;
unsigned int *hashes = bloom->hashes;
+ size_t nfuncs = bloom->nfuncs;
+ uint32_t counts_per_func = bloom->counts_per_func;
+
+ hash_func(hashes, checksum, nfuncs, counts_per_func);
- hash_func(bloom, s, len, hashes);
-
- for (i = 0; i < bloom->nfuncs; i++) {
- offset = i * bloom->counts_per_func;
+ for (i = 0; i < nfuncs; i++) {
+ offset = i * counts_per_func;
index = hashes[i] + offset;
if (!(bitmap_check(bloom->bitmap, index, bloom->offset))) {
return 0;
@@ -296,39 +303,40 @@ int counting_bloom_check(counting_bloom_t *bloom, const char *s, size_t len)
return 1;
}
-int free_scaling_bloom(scaling_bloom_t *bloom)
+static int free_scaling_bloom(scaling_bloom_t *bloom, struct dabm_mem_stat *mstat)
{
int i;
for (i = bloom->num_blooms - 1; i >= 0; i--) {
- FREE(bloom->blooms[i]->hashes);
+ FREE(bloom->blooms[i]->hashes, mstat);
bloom->blooms[i]->hashes = NULL;
- FREE(bloom->blooms[i]);
+ FREE(bloom->blooms[i], mstat);
bloom->blooms[i] = NULL;
}
- FREE(bloom->blooms);
- free_bitmap(bloom->bitmap);
- FREE(bloom);
+ FREE(bloom->blooms, mstat);
+ free_bitmap(bloom->bitmap, mstat);
+ FREE(bloom, mstat);
return 0;
}
/* creates a new counting bloom filter from a given scaling bloom filter, with count and id */
-counting_bloom_t *new_counting_bloom_from_scale(scaling_bloom_t *bloom)
+counting_bloom_t *new_counting_bloom_from_scale(scaling_bloom_t *bloom, struct dabm_mem_stat *mstat)
{
long offset;
double error_rate;
counting_bloom_t *cur_bloom;
error_rate = bloom->error_rate * (pow(ERROR_TIGHTENING_RATIO, bloom->num_blooms + 1));
-
- if ((bloom->blooms = (counting_bloom_t **)realloc(bloom->blooms, (bloom->num_blooms + 1) * sizeof(counting_bloom_t *))) == NULL) {
- fprintf(stderr, "Error, could not realloc a new bloom filter\n");
- return NULL;
- }
-
- cur_bloom = counting_bloom_init(bloom->capacity, error_rate, bloom->num_bytes);
+ bloom->blooms = (counting_bloom_t **)REALLOC(bloom->blooms, (bloom->num_blooms + 1) * sizeof(counting_bloom_t *), mstat);
+ assert(bloom->blooms);
+ // if (unlikely(bloom->blooms == NULL)) {
+ // fprintf(stderr, "Error, could not realloc a new bloom filter\n");
+ // return NULL;
+ // }
+
+ cur_bloom = counting_bloom_init(bloom->capacity, error_rate, bloom->num_bytes ,mstat);
bloom->blooms[bloom->num_blooms] = cur_bloom;
- bloom->bitmap = bitmap_resize(bloom->bitmap, bloom->num_bytes, bloom->num_bytes + cur_bloom->num_bytes);
+ bloom->bitmap = bitmap_resize(bloom->bitmap, bloom->num_bytes, bloom->num_bytes + cur_bloom->num_bytes, mstat);
/* reset header pointer, as mmap may have moved */
bloom->header = (scaling_bloom_header_t *) bloom->bitmap->array;
@@ -351,17 +359,18 @@ uint64_t scaling_bloom_clear_seqnums(scaling_bloom_t *bloom)
{
uint64_t seqnum;
- if (bloom->header->disk_seqnum != 0) {
- // disk_seqnum cleared on disk before any other changes
- bloom->header->disk_seqnum = 0;
- bitmap_flush(bloom->bitmap);
- }
+ // if (bloom->header->disk_seqnum != 0) {
+ // // disk_seqnum cleared on disk before any other changes
+ // bloom->header->disk_seqnum = 0;
+ // // bitmap_flush(bloom->bitmap);
+ // }
+ bloom->header->disk_seqnum = 0;
seqnum = bloom->header->mem_seqnum;
bloom->header->mem_seqnum = 0;
return seqnum;
}
-int scaling_bloom_add(scaling_bloom_t *bloom, const char *s, size_t len, uint64_t id)
+static int scaling_bloom_add(scaling_bloom_t *bloom, const uint32_t checksum[4], uint64_t id, struct dabm_mem_stat *mstat)
{
int i;
uint64_t seqnum;
@@ -377,84 +386,84 @@ int scaling_bloom_add(scaling_bloom_t *bloom, const char *s, size_t len, uint64_
seqnum = scaling_bloom_clear_seqnums(bloom);
if ((id > bloom->header->max_id) && (cur_bloom->header->count >= cur_bloom->capacity - 1)) {
- cur_bloom = new_counting_bloom_from_scale(bloom);
+ cur_bloom = new_counting_bloom_from_scale(bloom, mstat);
cur_bloom->header->count = 0;
cur_bloom->header->id = bloom->header->max_id + 1;
}
if (bloom->header->max_id < id) {
bloom->header->max_id = id;
}
- counting_bloom_add(cur_bloom, s, len);
+ counting_bloom_add(cur_bloom, checksum);
bloom->header->mem_seqnum = seqnum + 1;
return 1;
}
-int scaling_bloom_remove(scaling_bloom_t *bloom, const char *s, size_t len, uint64_t id)
-{
- counting_bloom_t *cur_bloom;
- int i;
- uint64_t seqnum;
+// int scaling_bloom_remove(scaling_bloom_t *bloom, const char *s, size_t len, uint64_t id)
+// {
+// counting_bloom_t *cur_bloom;
+// int i;
+// uint64_t seqnum;
- for (i = bloom->num_blooms - 1; i >= 0; i--) {
- cur_bloom = bloom->blooms[i];
- if (id >= cur_bloom->header->id) {
- seqnum = scaling_bloom_clear_seqnums(bloom);
+// for (i = bloom->num_blooms - 1; i >= 0; i--) {
+// cur_bloom = bloom->blooms[i];
+// if (id >= cur_bloom->header->id) {
+// seqnum = scaling_bloom_clear_seqnums(bloom);
- counting_bloom_remove(cur_bloom, s, len);
+// counting_bloom_remove(cur_bloom, s, len);
- bloom->header->mem_seqnum = seqnum + 1;
- return 1;
- }
- }
- return 0;
-}
-
-int scaling_bloom_check(scaling_bloom_t *bloom, const char *s, size_t len)
+// bloom->header->mem_seqnum = seqnum + 1;
+// return 1;
+// }
+// }
+// return 0;
+// }
+
+static int scaling_bloom_check(scaling_bloom_t *bloom, const uint32_t checksum[4])
{
int i;
counting_bloom_t *cur_bloom;
for (i = bloom->num_blooms - 1; i >= 0; i--) {
cur_bloom = bloom->blooms[i];
- if (counting_bloom_check(cur_bloom, s, len)) {
+ if (counting_bloom_check(cur_bloom, checksum)) {
return 1;
}
}
return 0;
}
-int scaling_bloom_flush(scaling_bloom_t *bloom)
-{
- if (bitmap_flush(bloom->bitmap) != 0) {
- return -1;
- }
- // all changes written to disk before disk_seqnum set
- if (bloom->header->disk_seqnum == 0) {
- bloom->header->disk_seqnum = bloom->header->mem_seqnum;
- return bitmap_flush(bloom->bitmap);
- }
- return 0;
-}
-
-uint64_t scaling_bloom_mem_seqnum(scaling_bloom_t *bloom)
-{
- return bloom->header->mem_seqnum;
-}
-
-uint64_t scaling_bloom_disk_seqnum(scaling_bloom_t *bloom)
-{
- return bloom->header->disk_seqnum;
-}
-
-scaling_bloom_t *scaling_bloom_init(unsigned int capacity, double error_rate)
+// int scaling_bloom_flush(scaling_bloom_t *bloom)
+// {
+// if (bitmap_flush(bloom->bitmap) != 0) {
+// return -1;
+// }
+// // all changes written to disk before disk_seqnum set
+// if (bloom->header->disk_seqnum == 0) {
+// bloom->header->disk_seqnum = bloom->header->mem_seqnum;
+// return bitmap_flush(bloom->bitmap);
+// }
+// return 0;
+// }
+
+// uint64_t scaling_bloom_mem_seqnum(scaling_bloom_t *bloom)
+// {
+// return bloom->header->mem_seqnum;
+// }
+
+// uint64_t scaling_bloom_disk_seqnum(scaling_bloom_t *bloom)
+// {
+// return bloom->header->disk_seqnum;
+// }
+
+scaling_bloom_t *scaling_bloom_init(unsigned int capacity, double error_rate,struct dabm_mem_stat *mstat)
{
scaling_bloom_t *bloom;
- bloom = ALLOC(scaling_bloom_t, 1);
- if ((bloom->bitmap = new_bitmap(sizeof(scaling_bloom_header_t))) == NULL) {
+ bloom = ALLOC(scaling_bloom_t, 1, mstat);
+ if ((bloom->bitmap = new_bitmap(sizeof(scaling_bloom_header_t), mstat)) == NULL) {
fprintf(stderr, "Error, Could not create bitmap with file\n");
- free_scaling_bloom(bloom);
+ free_scaling_bloom(bloom, mstat);
return NULL;
}
@@ -468,19 +477,20 @@ scaling_bloom_t *scaling_bloom_init(unsigned int capacity, double error_rate)
return bloom;
}
-scaling_bloom_t *new_scaling_bloom(unsigned int capacity, double error_rate)
+static scaling_bloom_t *new_scaling_bloom(unsigned int capacity, double error_rate, struct dabm_mem_stat *mstat)
{
scaling_bloom_t *bloom;
counting_bloom_t *cur_bloom;
- bloom = scaling_bloom_init(capacity, error_rate);
-
- if (!(cur_bloom = new_counting_bloom_from_scale(bloom))) {
- fprintf(stderr, "Error, Could not create counting bloom\n");
- free_scaling_bloom(bloom);
- return NULL;
- }
+ bloom = scaling_bloom_init(capacity, error_rate, mstat);
+ cur_bloom = new_counting_bloom_from_scale(bloom, mstat);
+ // if (!()) {
+ // fprintf(stderr, "Error, Could not create counting bloom\n");
+ // free_scaling_bloom(bloom, mstat);
+ // return NULL;
+ // }
+ assert(cur_bloom);
cur_bloom->header->count = 0;
cur_bloom->header->id = 0;
@@ -492,15 +502,17 @@ scaling_bloom_t *new_scaling_bloom(unsigned int capacity, double error_rate)
struct expiry_dablooms_handle{
scaling_bloom_t *cur_bloom;
scaling_bloom_t *next_bloom;
- time_t cur_bloom_start;
- time_t next_bloom_start;
- time_t last_bloom_check;
+ long cur_bloom_start_ms;
+ long next_bloom_start_ms;
+ long last_bloom_check_ms;
uint64_t cur_bloom_inc_id;
uint64_t next_bloom_inc_id;
unsigned int capacity;
- int expiry_time;
- time_t cur_time;
+ long expiry_time_ms;
+ long cur_time_ms;
+ long transition_time_ms;
double error_rate;
+ struct dabm_mem_stat mstat;
};
char* expiry_dablooms_errno_trans(enum expiry_dablooms_errno _errno){
@@ -517,28 +529,29 @@ char* expiry_dablooms_errno_trans(enum expiry_dablooms_errno _errno){
void expiry_dablooms_destroy(struct expiry_dablooms_handle *handle){
if(handle != NULL){
if(handle->cur_bloom != NULL){
- free_scaling_bloom(handle->cur_bloom);
+ free_scaling_bloom(handle->cur_bloom, &handle->mstat);
}
if(handle->next_bloom != NULL){
- free_scaling_bloom(handle->next_bloom);
+ free_scaling_bloom(handle->next_bloom, &handle->mstat);
}
- FREE(handle);
+ free(handle);
}
}
-struct expiry_dablooms_handle* expiry_dablooms_init(unsigned int capacity, double error_rate, time_t cur_time, int expiry_time){
- struct expiry_dablooms_handle *handle = ALLOC(struct expiry_dablooms_handle, 1);
- scaling_bloom_t *cur_bloom = new_scaling_bloom(capacity, error_rate);
+struct expiry_dablooms_handle* expiry_dablooms_init(unsigned int capacity, double error_rate, long cur_time_ms, long expiry_time_ms, long transition_time_ms){
+ struct expiry_dablooms_handle *handle = (struct expiry_dablooms_handle *)calloc(1, sizeof(struct expiry_dablooms_handle));
+ scaling_bloom_t *cur_bloom = new_scaling_bloom(capacity, error_rate, &handle->mstat);
if(cur_bloom == NULL){
goto error_out;
}
handle->cur_bloom = cur_bloom;
handle->cur_bloom_inc_id = 0;
- handle->cur_bloom_start=cur_time;
+ handle->cur_bloom_start_ms=cur_time_ms;
handle->capacity = capacity;
handle->error_rate = error_rate;
- handle->expiry_time = expiry_time;
- handle->cur_time = cur_time;
+ handle->expiry_time_ms = expiry_time_ms;
+ handle->cur_time_ms = cur_time_ms;
+ handle->transition_time_ms = transition_time_ms;
return handle;
error_out:
@@ -546,91 +559,123 @@ error_out:
return NULL;
}
-int expiry_dablooms_element_count_get(struct expiry_dablooms_handle *handle, uint64_t *count){
- if(handle == NULL || handle->cur_bloom == NULL){
- return EXPIRY_DABLOOMS_ERRNO_BLOOM_NULL;
+// int expiry_dablooms_element_count_get(struct expiry_dablooms_handle *handle, uint64_t *count){
+// if(handle == NULL || handle->cur_bloom == NULL){
+// return EXPIRY_DABLOOMS_ERRNO_BLOOM_NULL;
+// }
+// *count = handle->cur_bloom_inc_id;
+// return 0;
+// }
+
+static inline int bloom_in_transition_period(const struct expiry_dablooms_handle *handle, long cur_time_ms)
+{
+ if((cur_time_ms - handle->cur_bloom_start_ms) + handle->transition_time_ms < handle->expiry_time_ms){
+ return 0;
}
- *count = handle->cur_bloom_inc_id;
- return 0;
+ return 1;
}
-static int bloom_expired_check(struct expiry_dablooms_handle *handle, time_t cur_time){
- if(handle == NULL || handle->cur_bloom == NULL){
- return EXPIRY_DABLOOMS_ERRNO_BLOOM_NULL;
- }
- if(cur_time <= handle->last_bloom_check){
+static int bloom_expired_check(struct expiry_dablooms_handle *handle, long cur_time_ms, struct dabm_mem_stat *mstat){
+ // if(unlikely(handle == NULL || handle->cur_bloom == NULL)){
+ // return EXPIRY_DABLOOMS_ERRNO_BLOOM_NULL;
+ // }
+ if(likely(cur_time_ms <= handle->last_bloom_check_ms + 1000)){
return 0;
}
- time_t delta_time = cur_time - handle->cur_bloom_start;
- handle->cur_time=cur_time;
- if(delta_time >= handle->expiry_time){
- free_scaling_bloom(handle->cur_bloom);
+ long delta_time_ms = cur_time_ms - handle->cur_bloom_start_ms;
+ handle->cur_time_ms=cur_time_ms;
+ if(unlikely(delta_time_ms >= handle->expiry_time_ms)){
+ free_scaling_bloom(handle->cur_bloom, mstat);
if(handle->next_bloom != NULL){
handle->cur_bloom = handle->next_bloom;
- handle->cur_bloom_start = handle->next_bloom_start;
+ handle->cur_bloom_start_ms = handle->next_bloom_start_ms;
handle->cur_bloom_inc_id = handle->next_bloom_inc_id;
handle->next_bloom = NULL;
- handle->last_bloom_check=0;
+ handle->last_bloom_check_ms=0;
}
else{
- scaling_bloom_t *cur_bloom = new_scaling_bloom(handle->capacity, handle->error_rate);
- if(cur_bloom == NULL){
+ scaling_bloom_t *cur_bloom = new_scaling_bloom(handle->capacity, handle->error_rate, mstat);
+ if(unlikely(cur_bloom == NULL)){
return EXPIRY_DABLOOMS_ERRNO_NEW_BLOOM_FAIL;
}
handle->cur_bloom = cur_bloom;
handle->cur_bloom_inc_id = 0;
- handle->cur_bloom_start=cur_time;
- handle->last_bloom_check=0;
+ handle->cur_bloom_start_ms=cur_time_ms;
+ handle->last_bloom_check_ms=0;
}
}
else
{
- handle->last_bloom_check=cur_time;
+ handle->last_bloom_check_ms=cur_time_ms;
}
return 0;
}
-int expiry_dablooms_add(struct expiry_dablooms_handle *handle, const char *key, size_t len, time_t cur_time){
- if(key==NULL || len ==0 || handle == NULL)
- {
- return -1;
- }
- int ret = bloom_expired_check(handle, cur_time);
- if(ret < 0)
+int expiry_dablooms_add(struct expiry_dablooms_handle *handle, const char *key, size_t len, long cur_time_ms){
+ assert(!(key==NULL || len ==0 || handle == NULL));
+ int ret = bloom_expired_check(handle, cur_time_ms, &handle->mstat);
+ if(unlikely(ret < 0))
{
return ret;
}
+ __builtin_prefetch(key, 0, 0);
+ uint32_t checksum[4];
+ MurmurHash3_x64_128(key, len, SALT_CONSTANT, checksum);
+
+ long delta_time_ms = cur_time_ms - handle->cur_bloom_start_ms;
+ handle->cur_time_ms=cur_time_ms;
- scaling_bloom_add(handle->cur_bloom, key, len, handle->cur_bloom_inc_id);
- handle->cur_bloom_inc_id++;
- time_t delta_time = cur_time - handle->cur_bloom_start;
- handle->cur_time=cur_time;
- if(delta_time >= handle->expiry_time){
- if(handle->next_bloom == NULL){
- scaling_bloom_t *next_bloom = new_scaling_bloom(handle->capacity, handle->error_rate);
+ int is_transition = bloom_in_transition_period(handle, cur_time_ms);
+ if(unlikely(is_transition != 0)){
+ if(unlikely(handle->next_bloom == NULL)){
+ scaling_bloom_t *next_bloom = new_scaling_bloom(handle->capacity, handle->error_rate, &handle->mstat);
if(next_bloom == NULL){
return EXPIRY_DABLOOMS_ERRNO_NEW_BLOOM_FAIL;
}
handle->next_bloom = next_bloom;
handle->next_bloom_inc_id = 0;
- handle->next_bloom_start=cur_time;
+ handle->next_bloom_start_ms=cur_time_ms;
}
- scaling_bloom_add(handle->next_bloom, key, len, handle->next_bloom_inc_id);
+ scaling_bloom_add(handle->next_bloom, checksum, handle->next_bloom_inc_id, &handle->mstat);
handle->next_bloom_inc_id++;
+ }else{
+ scaling_bloom_add(handle->cur_bloom, checksum, handle->cur_bloom_inc_id, &handle->mstat);
+ handle->cur_bloom_inc_id++;
}
+
return 0;
}
-int expiry_dablooms_search(struct expiry_dablooms_handle *handle, const char *key, size_t len, time_t cur_time){
- if(key==NULL || len ==0 || handle == NULL)
- {
- return -1;
- }
- int ret = bloom_expired_check(handle, cur_time);
- if(ret < 0)
+int expiry_dablooms_search(struct expiry_dablooms_handle *handle, const char *key, size_t len, long cur_time_ms){
+ assert(!(key==NULL || len ==0 || handle == NULL));
+ int ret = bloom_expired_check(handle, cur_time_ms, &handle->mstat);
+ if(unlikely(ret < 0))
{
return ret;
}
- int bloom_hit = scaling_bloom_check(handle->cur_bloom, key, len);
- return bloom_hit;
+
+ __builtin_prefetch(key, 0, 0);
+ uint32_t checksum[4];
+ MurmurHash3_x64_128(key, len, SALT_CONSTANT, checksum);
+
+ int bloom_hit_cur = 0;
+ int bloom_hit_next = 0;
+ int is_transition = bloom_in_transition_period(handle, cur_time_ms);
+
+ if(unlikely(is_transition != 0) && (handle->next_bloom != NULL)){
+ bloom_hit_next = scaling_bloom_check(handle->next_bloom, checksum);
+ }
+
+ if(likely(0 == bloom_hit_next)){
+ bloom_hit_cur = scaling_bloom_check(handle->cur_bloom, checksum);
+ }
+
+ return bloom_hit_cur || bloom_hit_next;
}
+
+int expiry_dablooms_get_mem_stat(struct expiry_dablooms_handle *handle, long long *blocks, long long *bytes)
+{
+ *blocks = handle->mstat.blocks;
+ *bytes = handle->mstat.bytes;
+ return 0;
+} \ No newline at end of file
diff --git a/src/support/dablooms/src/dablooms.h b/src/support/dablooms/src/dablooms.h
index 017fb53..f93a9a0 100755
--- a/src/support/dablooms/src/dablooms.h
+++ b/src/support/dablooms/src/dablooms.h
@@ -5,6 +5,18 @@
#include <stdint.h>
#include <stdlib.h>
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#ifndef likely
+#define likely(x) __builtin_expect(!!(x), 1)
+#endif
+
+#ifndef unlikely
+#define unlikely(x) __builtin_expect(!!(x), 0)
+#endif
+
const char *dablooms_version(void);
typedef struct {
@@ -13,15 +25,15 @@ typedef struct {
} bitmap_t;
-bitmap_t *bitmap_resize(bitmap_t *bitmap, size_t old_size, size_t new_size);
-bitmap_t *new_bitmap(size_t bytes);
+// bitmap_t *bitmap_resize(bitmap_t *bitmap, size_t old_size, size_t new_size);
+// bitmap_t *new_bitmap(size_t bytes);
-int bitmap_increment(bitmap_t *bitmap, unsigned int index, long offset);
+// int bitmap_increment(bitmap_t *bitmap, unsigned int index, long offset);
int bitmap_decrement(bitmap_t *bitmap, unsigned int index, long offset);
-int bitmap_check(bitmap_t *bitmap, unsigned int index, long offset);
+// int bitmap_check(bitmap_t *bitmap, unsigned int index, long offset);
int bitmap_flush(bitmap_t *bitmap);
-void free_bitmap(bitmap_t *bitmap);
+// void free_bitmap(bitmap_t *bitmap);
typedef struct {
uint64_t id;
@@ -43,11 +55,11 @@ typedef struct {
bitmap_t *bitmap;
} counting_bloom_t;
-int free_counting_bloom(counting_bloom_t *bloom);
-counting_bloom_t *new_counting_bloom(unsigned int capacity, double error_rate);
-int counting_bloom_add(counting_bloom_t *bloom, const char *s, size_t len);
+// int free_counting_bloom(counting_bloom_t *bloom);
+// counting_bloom_t *new_counting_bloom(unsigned int capacity, double error_rate);
+// int counting_bloom_add(counting_bloom_t *bloom, const char *s, size_t len);
int counting_bloom_remove(counting_bloom_t *bloom, const char *s, size_t len);
-int counting_bloom_check(counting_bloom_t *bloom, const char *s, size_t len);
+// int counting_bloom_check(counting_bloom_t *bloom, const char *s, size_t len);
typedef struct {
uint64_t max_id;
@@ -65,11 +77,11 @@ typedef struct {
bitmap_t *bitmap;
} scaling_bloom_t;
-scaling_bloom_t *new_scaling_bloom(unsigned int capacity, double error_rate);
-int free_scaling_bloom(scaling_bloom_t *bloom);
-int scaling_bloom_add(scaling_bloom_t *bloom, const char *s, size_t len, uint64_t id);
+// scaling_bloom_t *new_scaling_bloom(unsigned int capacity, double error_rate);
+// int free_scaling_bloom(scaling_bloom_t *bloom);
+// int scaling_bloom_add(scaling_bloom_t *bloom, const char *s, size_t len, uint64_t id);
int scaling_bloom_remove(scaling_bloom_t *bloom, const char *s, size_t len, uint64_t id);
-int scaling_bloom_check(scaling_bloom_t *bloom, const char *s, size_t len);
+// int scaling_bloom_check(scaling_bloom_t *bloom, const char *s, size_t len);
int scaling_bloom_flush(scaling_bloom_t *bloom);
uint64_t scaling_bloom_mem_seqnum(scaling_bloom_t *bloom);
uint64_t scaling_bloom_disk_seqnum(scaling_bloom_t *bloom);
@@ -81,9 +93,13 @@ enum expiry_dablooms_errno{
};
char* expiry_dablooms_errno_trans(enum expiry_dablooms_errno _errno);
void expiry_dablooms_destroy(struct expiry_dablooms_handle *handle);
-struct expiry_dablooms_handle* expiry_dablooms_init(unsigned int capacity, double error_rate, time_t cur_time, int expiry_time);
+struct expiry_dablooms_handle* expiry_dablooms_init(unsigned int capacity, double error_rate, long cur_time_ms, long expiry_time_ms, long transition_time_ms);
int expiry_dablooms_element_count_get(struct expiry_dablooms_handle *handle, uint64_t *count);
int expiry_dablooms_add(struct expiry_dablooms_handle *handle, const char *key, size_t len, time_t cur_time);
int expiry_dablooms_search(struct expiry_dablooms_handle *handle, const char *key, size_t len, time_t cur_time);
+int expiry_dablooms_get_mem_stat(struct expiry_dablooms_handle *handle, long long *blocks, long long *bytes);
+#ifdef __cplusplus
+}
+#endif
#endif
diff --git a/src/support/dablooms/src/mem_util.c b/src/support/dablooms/src/mem_util.c
new file mode 100644
index 0000000..0978018
--- /dev/null
+++ b/src/support/dablooms/src/mem_util.c
@@ -0,0 +1,101 @@
+#include <sys/stat.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <fcntl.h>
+#include <math.h>
+#include <string.h>
+#include <sys/mman.h>
+#include <unistd.h>
+#include <errno.h>
+#include <time.h>
+#include <pthread.h>
+#include "murmur.h"
+#include "dablooms.h"
+#include "mem_util.h"
+
+struct dabm_mem_hdr{
+ size_t size;
+ char *user_ptr;
+};
+
+void *dabm_malloc(size_t user_size, struct dabm_mem_stat *mstat)
+{
+ size_t real_size = user_size + sizeof(struct dabm_mem_hdr);
+ char *real_ptr = (char *)malloc(real_size);
+ if (real_ptr)
+ {
+ struct dabm_mem_hdr *stat = (struct dabm_mem_hdr *)real_ptr;
+ stat->size = real_size;
+ stat->user_ptr = real_ptr + sizeof(struct dabm_mem_hdr);
+ mstat->blocks++;
+ mstat->bytes += real_size;
+ return stat->user_ptr;
+ }
+ return NULL;
+}
+
+void *dabm_calloc(size_t nmemb, size_t user_size, struct dabm_mem_stat *mstat)
+{
+ size_t real_size = nmemb * user_size + sizeof(struct dabm_mem_hdr);
+ char *real_ptr = (char *)calloc(1, real_size);
+ if (real_ptr)
+ {
+ struct dabm_mem_hdr *stat = (struct dabm_mem_hdr *)real_ptr;
+ stat->size = real_size;
+ stat->user_ptr = real_ptr + sizeof(struct dabm_mem_hdr);
+ mstat->blocks++;
+ mstat->bytes += real_size;
+ return stat->user_ptr;
+ }
+ return NULL;
+}
+
+void dabm_free(void *user_ptr, struct dabm_mem_stat *mstat)
+{
+ if (user_ptr)
+ {
+ struct dabm_mem_hdr *stat = (struct dabm_mem_hdr *)((char *)user_ptr - sizeof(struct dabm_mem_hdr));
+ mstat->blocks--;
+ mstat->bytes -= stat->size;
+ free(stat);
+ }
+}
+
+void *dabm_realloc(void *user_ptr, size_t new_size, struct dabm_mem_stat *mstat)
+{
+ char *old_real_ptr;
+ struct dabm_mem_hdr *stat;
+
+ if (user_ptr)
+ {
+ stat = (struct dabm_mem_hdr *)((char *)user_ptr - sizeof(struct dabm_mem_hdr));
+ old_real_ptr = (char *)stat;
+ mstat->bytes -= stat->size;
+ }
+ else
+ {
+ old_real_ptr = NULL;
+ mstat->blocks++; // no memory alloc before
+ }
+
+ size_t real_size = new_size + sizeof(struct dabm_mem_hdr);
+ char *new_real_ptr = (char *)realloc(old_real_ptr, real_size);
+ if (new_real_ptr)
+ {
+ mstat->bytes += real_size;
+ stat = (struct dabm_mem_hdr *)new_real_ptr;
+ stat->size = real_size;
+ stat->user_ptr = new_real_ptr + sizeof(struct dabm_mem_hdr);
+ return stat->user_ptr;
+ }
+
+ return NULL;
+}
+
+int dabm_get_mem_hdr_size(void)
+{
+ return sizeof(struct dabm_mem_hdr);
+}
+
diff --git a/src/support/dablooms/src/mem_util.h b/src/support/dablooms/src/mem_util.h
new file mode 100644
index 0000000..f038392
--- /dev/null
+++ b/src/support/dablooms/src/mem_util.h
@@ -0,0 +1,22 @@
+#ifndef _DABM_MEM_UTIL_H_
+#define _DABM_MEM_UTIL_H_ 1
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+ struct dabm_mem_stat
+ {
+ long blocks;
+ long bytes;
+ };
+
+ void *dabm_malloc(size_t size, struct dabm_mem_stat *mstat);
+ void *dabm_calloc(size_t nmemb, size_t size, struct dabm_mem_stat *mstat);
+ void dabm_free(void *user_ptr, struct dabm_mem_stat *mstat);
+ void *dabm_realloc(void *ptr, size_t new_size, struct dabm_mem_stat *mstat);
+ int dabm_get_mem_hdr_size(void);
+#ifdef __cplusplus
+}
+#endif
+#endif \ No newline at end of file
diff --git a/src/support/dablooms/src/test_mem_util.c b/src/support/dablooms/src/test_mem_util.c
new file mode 100644
index 0000000..f8176d1
--- /dev/null
+++ b/src/support/dablooms/src/test_mem_util.c
@@ -0,0 +1,63 @@
+#include <sys/stat.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <fcntl.h>
+#include <math.h>
+#include <string.h>
+#include <sys/mman.h>
+#include <unistd.h>
+#include <errno.h>
+#include <assert.h>
+#include <time.h>
+#include "murmur.h"
+#include "dablooms.h"
+#include "mem_util.h"
+
+int main(int argc, char const *argv[])
+{
+ int i;
+ struct dabm_mem_stat mstat = {};
+
+ char *ptr = (char *)dabm_malloc(100, &mstat);
+ assert(ptr != NULL);
+
+ assert(mstat.blocks == 1);
+ assert(mstat.bytes == 100 + dabm_get_mem_hdr_size());
+ dabm_free(ptr, &mstat);
+ assert(mstat.blocks == 0);
+ assert(mstat.bytes == 0);
+
+ int *iptr = (int *)dabm_calloc(100, sizeof(int), &mstat);
+ assert(iptr != NULL);
+ for (i = 0; i < 100; i++)
+ {
+ assert(iptr[i] == 0);
+ }
+ assert(mstat.blocks == 1);
+ assert(mstat.bytes == 100 + dabm_get_mem_hdr_size());
+ dabm_free(iptr, &mstat);
+
+ ptr = dabm_realloc(NULL, 100, &mstat);
+ assert(ptr != NULL);
+
+ assert(mstat.blocks == 1);
+ assert(mstat.bytes == 100 + dabm_get_mem_hdr_size());
+ memset(ptr, 0xAA, 100);
+
+ unsigned char *uptr = (unsigned char *)dabm_realloc(ptr, 1000, &mstat);
+ for (i = 0; i < 100; i++)
+ {
+ assert(uptr[i] == 0xAA);
+ }
+ assert(mstat.blocks == 1);
+ assert(mstat.bytes == 100 + dabm_get_mem_hdr_size());
+
+ dabm_free(uptr, &mstat);
+ assert(mstat.blocks == 0);
+ assert(mstat.bytes == 0);
+
+ printf("test mem util success!\n");
+ return 0;
+}