diff options
23 files changed, 1017 insertions, 321 deletions
diff --git a/benchmark/sapp_default_config/etc/sapp.toml b/benchmark/sapp_default_config/etc/sapp.toml index 0c1c32e..2805521 100644 --- a/benchmark/sapp_default_config/etc/sapp.toml +++ b/benchmark/sapp_default_config/etc/sapp.toml @@ -133,6 +133,8 @@ # 0:disable bloom filter, 1:dabloom, 2:apbloom #bloom_library=1 first_packets=3 + bloom_partition_num=1 + [STREAM] ### note, stream_id_base_time format is "%Y-%m-%d %H:%M:%S" diff --git a/bin/etc/sapp.toml b/bin/etc/sapp.toml index 0c1c32e..50fbfa5 100644 --- a/bin/etc/sapp.toml +++ b/bin/etc/sapp.toml @@ -133,6 +133,8 @@ # 0:disable bloom filter, 1:dabloom, 2:apbloom #bloom_library=1 first_packets=3 + bloom_partition_num=16 + bloom_transition_time=3 [STREAM] ### note, stream_id_base_time format is "%Y-%m-%d %H:%M:%S" diff --git a/include/private/duplicate_pkt_distinguish.h b/include/private/duplicate_pkt_distinguish.h index dffd6e8..2a437cf 100644 --- a/include/private/duplicate_pkt_distinguish.h +++ b/include/private/duplicate_pkt_distinguish.h @@ -4,10 +4,10 @@ #include "mesa_net.h" #include "stream_internal.h" -/* 网络序, network order */ +/* 网络�?, network order */ -/* ipv4和ipv6分别定义, 因为ipv6地址巨大, 比例也相对较少, - 统一使用union定义, 都会按最大长度计算, 影响性能. +/* ipv4和ipv6分别定义, 因为ipv6地址巨大, 比例也相对较�?, + 统一使用union定义, 都会按最大长度计�?, 影响性能. */ struct sapp_dup_key_ip4_tuple{ unsigned int sip; @@ -29,7 +29,7 @@ struct __sapp_dup_pkt_key_l4{ struct __sapp_dup_pkt_key_v4{ struct __sapp_dup_pkt_key_l4 l4; - unsigned short ip_id; /* TCP层SYN重传时, 或者应用层重传数据, ipid是不一样的, 其实不能算为重复包, 所以ipid也作为Key */ + unsigned short ip_id; /* TCP层SYN重传�?, 或者应用层重传数据, ipid是不一样的, 其实不能算为重复�?, 所以ipid也作为Key */ struct sapp_dup_key_ip4_tuple ip_addr; }__attribute__((packed, aligned(1))); diff --git a/include/private/sapp_global_val.h b/include/private/sapp_global_val.h index b4b41da..80caa54 100644 --- a/include/private/sapp_global_val.h +++ b/include/private/sapp_global_val.h @@ -18,7 +18,7 @@ enum pkt_dump_mode{ /* - vxlan��vlan_flipping�ǿ��Թ����?, + vxlan��vlan_flipping�ǿ��Թ����?, vxlan������������, vlan_flipping���ڱ����Լ�����, */ @@ -42,7 +42,7 @@ typedef struct{ typedef struct{ int worker_threads; - int send_only_threads_max_num; /* �������߳��������?, �Dz����̬������?, sapp������Ԥ֪, ֻ��������ֵ */ + int send_only_threads_max_num; /* �������߳��������?, �Dz����̬������?, sapp������Ԥ֪, ֻ��������ֵ */ int bind_mask_array_num; long bind_mask_array[SAPP_MAX_THREADS]; long bind_mask; /* bind_mask_array�е�ÿ��������Ϊbit_indexת�ɵ�����ֵ */ @@ -117,7 +117,7 @@ typedef struct{ int timeout; int meaningful_statistics_minimum_pkt; int meaningful_statistics_minimum_byte; - unsigned short *well_known_ports_array; /* �����½�����ȷ��server��, ����4001->8000, ʵ������OICQ��C2S�������ݰ�,֮ǰ�İ汾�ᱻ��ʶ��ΪS2C�����? */ + unsigned short *well_known_ports_array; /* �����½�����ȷ��server��, ����4001->8000, ʵ������OICQ��C2S�������ݰ�,֮ǰ�İ汾�ᱻ��ʶ��ΪS2C�����? */ int well_known_ports_array_num; int max_timeouts_per_sec; int max_opening_per_sec; @@ -161,9 +161,9 @@ typedef struct{ int process_latency_refresh_interval_s; int process_latency_threshold_in_us; /* �����ļ��ﵥλ��microsecond, ��Ҫ����1000 */ /* ��ȷģʽ������, fs2����ͳ��Ҳ�dz���CPU, ÿ������Ҫ����FS_operate(), - ���ͣ�þ�ȷģ�?, ����fs2���DZ������һ��ʱ�������?, ��Сֵ����һ��ʱ���ƽ���?, ������FS_operate()����, + ���ͣ�þ�ȷģ�?, ����fs2���DZ������һ��ʱ�������?, ��Сֵ����һ��ʱ���ƽ���?, ������FS_operate()����, ��ȷ�Ƚ�����һЩ, ���������˵���FS_operate()�Ĵ���. - ����pkt_latency_accurate_enable��ƽ��ֵ�����ǶԵ�, ������Ͳ���?. + ����pkt_latency_accurate_enable��ƽ��ֵ�����ǶԵ�, ������Ͳ���?. */ int process_latency_clock_id; @@ -191,7 +191,7 @@ enum __sapp_config_packet_io_deployment_mode{ SAPP_CFG_PKT_IO_DEPOLY_MODE_TRANSPARENT, }; -#define MAX_VLAN_FLIPPING_MAP_NUM (4096) /* vlan_id���Χ��?0~4095, ���鳤����4096 */ +#define MAX_VLAN_FLIPPING_MAP_NUM (4096) /* vlan_id���Χ��?0~4095, ���鳤����4096 */ typedef struct{ unsigned short couple_vlan_id; /* vlan flipping�dzɶԴ���, �����±��ʾһ��vlanid, Ԫ�ص�ֵ����һ��vlanid, host order */ @@ -221,9 +221,11 @@ typedef struct{ int dup_pkt_distinguish_ipv4_tcp; int dup_pkt_distinguish_ipv4_udp; int bloom_capacity; - int bloom_timeout; int bloom_slice_num; + long bloom_timeout_ms; enum bloom_library bloom_library; + int bloom_partition_num; + long transition_time_ms; double bloom_error_rate; int first_packets; }sapp_dup_pkt_t; @@ -242,17 +244,17 @@ typedef struct{ sapp_config_packet_io_tunnel_t packet_io_tunnel; const char *input_bpf_filter; char deployment_mode_str[NAME_MAX]; /* [mirror, inline, transparent, dumpfile] */ - enum sapp_deploment_mode_t deployment_mode_bin; /* ��������ڴ���?, ת����ֵ��ʽ, ��1:mirror; 2:transparent; 3:inline��, ���?: enum sapp_deploment_mode_t */ + enum sapp_deploment_mode_t deployment_mode_bin; /* ��������ڴ���?, ת����ֵ��ʽ, ��1:mirror; 2:transparent; 3:inline��, ���?: enum sapp_deploment_mode_t */ sapp_config_packet_io_dev_t internal; sapp_config_packet_io_dev_t external; char pcap_dumpfile_name[NAME_MAX]; int polling_priority; /* call sapp_recv_pkt every call polling_entry times, ���ö��ٴ�polling�����һ��recv pkt, 1��ʾ�������ȼ���ͬ */ int work_percent; int without_usleep; /* ���ܲ���ģʽ, �������usleep, CPU��100% */ - int inbound_route_dir; /* ��ʾ�뾳, I2C�����ֵ��?0����1 */ + int inbound_route_dir; /* ��ʾ�뾳, I2C�����ֵ��?0����1 */ char pcap_capture_direction[NAME_MAX]; /* in, out, inout */ char inject_pkt_mode_string[NAME_MAX]; - int inject_pkt_mode; /* ��ע�뷽ʽ������ֵ, ���?: enum send_fake_packet_mode */ + int inject_pkt_mode; /* ��ע�뷽ʽ������ֵ, ���?: enum send_fake_packet_mode */ int inject_pkt_prepend_segment_id; unsigned short inject_mode_inline_device_sport; /* udp socket�ı���Դ�˿�, host order, ������ */ char inject_mode_single_gateway_device[NAME_MAX]; @@ -306,7 +308,7 @@ typedef struct{ const char *cfg_file_inline_dev_relative; const char *cfg_file_inline_dev_absolute; - const char *cfg_file_necessary_plug_relative; /* ��Ҫ����б�?, �������ʧ��sapp���˳� */ + const char *cfg_file_necessary_plug_relative; /* ��Ҫ����б�?, �������ʧ��sapp���˳� */ const char *cfg_file_necessary_plug_absolute; const char *cfg_file_stream_compare_layer_relative; @@ -357,7 +359,7 @@ typedef struct{ typedef struct{ int ipv6_decapsulation_enabled; /* �Ƿ����ipv6���ݰ� */ - int ipv6_send_packet_enabled; /* �Ƿ�֧�ַ���ipv6���ݰ�, ��������ϵͳipv6�ں�ģ���Ƿ����? */ + int ipv6_send_packet_enabled; /* �Ƿ�֧�ַ���ipv6���ݰ�, ��������ϵͳipv6�ں�ģ���Ƿ����? */ int tcp_drop_pure_ack_pkt; /* ����û�и��صĴ�ack��, ���Խ�Լһ��������ѯ, ����ҵ���������� */ int tcp_syn_option_parse_enabled; /* �Ƿ����tcp syn��ͷ��ѡ�� */ int skip_not_ip_layer_over_eth; /* ������ip��, ��֤�ڲ���ģʽ��, ����ͨ��ϵͳ·�ɷ���rst�� */ @@ -422,15 +424,15 @@ typedef struct{ pthread_t thread_timer_loop_id; int cpu_bind_core_id_per_thread[SAPP_MAX_THREADS]; /* ���������bind_mask, ��¼ÿ��IO�����̰߳�cpu core id */ void *breakpad; - char overlay_layer_def[__ADDR_TYPE_MAX][SAPP_SUPPORT_LAYER_NUM_MAX]; /* ��ʾ��ԭʼ���Ļ����Ϸ�װ������, ��vxlan, ��Щ�㲻Ӧ�õ��ò��?, ��עʱ��Ҫ����� */ - char prune_layer_def[__ADDR_TYPE_MAX][SAPP_SUPPORT_LAYER_NUM_MAX]; /* ��ʾע���ʱ��Ҫ�����IJ�?, ���ڴ�������������ļ�?: skip_not_iop_layer, ����vlan, mpls��, ����mirrorģʽ��, ������ע��㲻һ�������绷��? */ + char overlay_layer_def[__ADDR_TYPE_MAX][SAPP_SUPPORT_LAYER_NUM_MAX]; /* ��ʾ��ԭʼ���Ļ����Ϸ�װ������, ��vxlan, ��Щ�㲻Ӧ�õ��ò��?, ��עʱ��Ҫ����� */ + char prune_layer_def[__ADDR_TYPE_MAX][SAPP_SUPPORT_LAYER_NUM_MAX]; /* ��ʾע���ʱ��Ҫ�����IJ�?, ���ڴ�������������ļ�?: skip_not_iop_layer, ����vlan, mpls��, ����mirrorģʽ��, ������ע��㲻һ�������绷��? */ void *under_ddos_handle; pthread_t gdev_keepalive_log_thread_id; }sapp_gval_individual_fixed_t; typedef struct{ - unsigned long long count[SAPP_STAT_NUM_MAX]; /* ������, �����?, �½�������, ������������ */ + unsigned long long count[SAPP_STAT_NUM_MAX]; /* ������, �����?, �½�������, ������������ */ unsigned long long length[SAPP_STAT_NUM_MAX]; /* ������, �����ݰ����ֽ��� */ unsigned long long count_per_layer[__ADDR_TYPE_MAX][SAPP_SUPPORT_LAYER_NUM_MAX]; unsigned long long length_per_layer[__ADDR_TYPE_MAX][SAPP_SUPPORT_LAYER_NUM_MAX]; @@ -457,7 +459,7 @@ struct __sapp_gval_mthread{ sapp_fuzzy_latency_stat_t fuzzy_pkt_latency_stat_per_thread; sapp_fuzzy_latency_stat_t fuzzy_pkt_latency_stat_per_entry_per_thread[SAPP_MAX_PLUG_ENTRY_NUM]; const raw_pkt_t *raw_pkt; /* ��ΪMESA_fakepacket_send_xxxϵ�к���û��raw_pkt����, Ҳû��stream_info, Ϊ����ǰ����, �����Ľӿ�, ����ȫ�ֱ������ڴ洢��ǰ�� */ - void *dup_pkt_distinguish_handle; /* ����������ڿ���?, inject, ipv4_tcp, ipv4_udp, ��ͬʹ��һ��bloom���? */ + void *dup_pkt_distinguish_handle; /* ����������ڿ���?, inject, ipv4_tcp, ipv4_udp, ��ͬʹ��һ��bloom���? */ volatile int destory_env_done; //0:do nothing; 1:doing; 2:done sapp_mem_used_stat_t mem_used_stat; }__attribute__ ((aligned (64))); /* for multi-thread, must 64byte alignment */ @@ -476,16 +478,16 @@ typedef struct{ sapp_cmd_args_val_t **cmd_args_array; int cmd_args_num; char **dumpfie_list_array; /* ����--dumpfile-list����ִ�к�, �õ����ļ��б�, �ַ���ָ������ */ - int slient_mode; /* ��Ĭģʽ, ǰ̨������κ����? */ + int slient_mode; /* ��Ĭģʽ, ǰ̨������κ����? */ }sapp_cla_t; typedef struct{ sapp_cla_t cla; sapp_config_t config; - sapp_gval_individual_fixed_t individual_fixed; /* ��ʼ�����ٸı�ı���? */ - sapp_gval_individual_volatile_t *individual_volatile; /* �����������ʵʱ���µĶ�������? */ - sapp_gval_mthread_t *mthread_volatile[SAPP_MAX_THREADS]; /* �����������ʵʱ���µĶ��̱߳���? */ + sapp_gval_individual_fixed_t individual_fixed; /* ��ʼ�����ٸı�ı���? */ + sapp_gval_individual_volatile_t *individual_volatile; /* �����������ʵʱ���µĶ�������? */ + sapp_gval_mthread_t *mthread_volatile[SAPP_MAX_THREADS]; /* �����������ʵʱ���µĶ��̱߳���? */ }sapp_global_t; extern embed_layer_t g_stream_compare_layer_set; diff --git a/include/private/sapp_mem.h b/include/private/sapp_mem.h index d68a0ac..b6b6ca8 100644 --- a/include/private/sapp_mem.h +++ b/include/private/sapp_mem.h @@ -46,14 +46,14 @@ typedef enum{ SAPP_MEM_DYN_PADDR, /* 每层地址所占的内存, 就不分具体协议了, 太麻烦了!! */ SAPP_MEM_DYN_DETAIN_PKT, SAPP_MEM_DYN_SID_LIST, - + SAPP_MEM_DYN_BLOOM_FILTER, __SAPP_MEM_TYPE_MAX, }sapp_mem_type_t; -void *sapp_mem_malloc(sapp_mem_type_t type, int thread_seq, int size); -void *sapp_mem_calloc(sapp_mem_type_t type, int thread_seq, int size); +void *sapp_mem_malloc(sapp_mem_type_t type, int thread_seq, unsigned int size); +void *sapp_mem_calloc(sapp_mem_type_t type, int thread_seq, unsigned int size); void sapp_mem_free(sapp_mem_type_t type, int thread_seq, void *data); -void *sapp_mem_realloc(sapp_mem_type_t type, int thread_seq, void *old_ptr, int size); +void *sapp_mem_realloc(sapp_mem_type_t type, int thread_seq, void *old_ptr, unsigned int size); void sapp_mem_stat_output(void); #ifndef SAPP_FREE @@ -69,6 +69,8 @@ void sapp_mem_stat_output(void); #define SAPP_GLOBAL_FREE(mem) do{if(mem){sapp_mem_free(SAPP_MEM_FIX_GLOBAL_VAL, MEM_STAT_GLOBAL_THREAD_ID, (void *)mem); mem = NULL;}}while(0) #endif +int bloomfilter_get_mem_stat(int tseq, long long *mem_blocks, long long *mem_bytes); + #ifdef __cplusplus } #endif diff --git a/include/private/stream_internal.h b/include/private/stream_internal.h index 90030b5..c57f5f0 100644 --- a/include/private/stream_internal.h +++ b/include/private/stream_internal.h @@ -523,8 +523,7 @@ void packet_io_device_alias_destroy(void); void sapp_dup_pkt_destroy(void); const char *stream_bridge_id_to_name(int bridge_id); void sapp_printf(const char *fmt, ...); - - +int sapp_mem_init(void); #ifdef __cplusplus } diff --git a/module_test/src/CMakeLists.txt b/module_test/src/CMakeLists.txt index a5ecba4..d041dad 100644 --- a/module_test/src/CMakeLists.txt +++ b/module_test/src/CMakeLists.txt @@ -7,6 +7,8 @@ include_directories(${PROJECT_SOURCE_DIR}/include/private) include_directories(${PROJECT_SOURCE_DIR}/include/public) include_directories(${PROJECT_SOURCE_DIR}/include/support) include_directories(${PROJECT_SOURCE_DIR}/include/public/stream_inc) +include_directories(${PROJECT_SOURCE_DIR}/src/support/dablooms/src) +include_directories(${PROJECT_SOURCE_DIR}/src/support/ap_bloom/src) add_definitions(-DSAPP_V4=1) add_definitions(-fPIC) @@ -14,9 +16,9 @@ add_definitions(-fPIC) add_executable(gtest_sapp_v4 gtest_main.cpp gtest_sapp_ipv4.cpp gtest_sapp_ipv6.cpp gtest_sapp_tcp.cpp gtest_sapp_udp.cpp gtest_sapp_comm.cpp gtest_sapp_support.cpp gtest_sapp_plug_ctrl.cpp gtest_sapp_tunnel.cpp gtest_transparent_run.cpp gtest_sapp_jump_layer.cpp gtest_inline_run.cpp gtest_sapp_asymmetric.cpp gtest_sapp_inject.cpp gtest_sapp_app_state.cpp gtest_mpls.cpp gtest_vlan.cpp gtest_fake_marsio_run.cpp gtest_sapp_proxy.cpp gtest_sapp_pkt_dump.cpp - ../test_case/auto_gen_test_functions.cpp ../test_case/test_function_common.cpp) + ../test_case/auto_gen_test_functions.cpp ../test_case/test_function_common.cpp gtest_sapp_bloom.cpp) -target_link_libraries(gtest_sapp_v4 gtest-static ${SAPP_DEPEND_DYN_LIB} sapp_benchmark sapp_devel) +target_link_libraries(gtest_sapp_v4 gtest-static ${SAPP_DEPEND_DYN_LIB} sapp_benchmark sapp_devel libdabloom) add_executable(transparent_test_sapp_v4 gtest_transparent_env.cpp gtest_sapp_comm.cpp) target_link_libraries(transparent_test_sapp_v4 pthread pcap m dl MESA_jump_layer MESA_handle_logger) diff --git a/module_test/src/gtest_main.cpp b/module_test/src/gtest_main.cpp index 913e1ef..6c0e475 100644 --- a/module_test/src/gtest_main.cpp +++ b/module_test/src/gtest_main.cpp @@ -2211,6 +2211,17 @@ TEST(performance, simple) #endif +#include "sapp_global_val.h" + +TEST(dabloom, simple) +{ + ASSERT_EQ(0, sapp_bloom_filter_test_run(BLOOM_LIBRARY_DABLOOM)); +} + +TEST(apbloom, simple) +{ + ASSERT_EQ(0, sapp_bloom_filter_test_run(BLOOM_LIBRARY_APBLOOM)); +} static const char *gtest_cla_short_options = "hvLsf:l:"; diff --git a/module_test/src/gtest_sapp_bloom.cpp b/module_test/src/gtest_sapp_bloom.cpp new file mode 100644 index 0000000..8f49c64 --- /dev/null +++ b/module_test/src/gtest_sapp_bloom.cpp @@ -0,0 +1,269 @@ +#include <stdlib.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <netinet/ip.h>
+#include <netinet/tcp.h>
+#include <netinet/udp.h>
+#include <assert.h>
+#include <time.h>
+#include <arpa/inet.h>
+#include "stream.h"
+#include <sys/types.h> /* See NOTES */
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <errno.h>
+#include "gtest_sapp_fun.h"
+#include <gtest/gtest.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <time.h>
+#include <pthread.h>
+#include "dablooms.h"
+#include "sapp_global_val.h"
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+ void bloom_free_partition(void *bloom_filter, const sapp_dup_pkt_t *dup_conf);
+ void *bloom_new_partition(const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms);
+ int bloom_check_partition(void *bloom_filter, const char *key, int key_len, const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms);
+ void bloom_add_partition(void *bloom_filter, const char *key, int key_len, const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms);
+#ifdef __cplusplus
+}
+#endif
+
+static const unsigned int BM_CAPACITY = 200000;
+static const double BM_ERROR_RATE = 0.000001;
+static const int BM_TIMEOUT = 10 * 1000; // ms
+static const int BM_PARTITION_NUM = 64;
+static const long BM_TRANSITION_TIME = 2 * 1000;
+
+static const int MAX_ITEM_NUM = 10000000;
+static const int ITEM_BATCH_NUM = 1000;
+#define TUPLE4_ADDR_LEN (12 + 1) // sizeof(tuple4) add begin char 'Y' or 'N'
+#define BM_TEST_MAX_THREAD (1)
+static pthread_t bm_test_thread_id[BM_TEST_MAX_THREAD];
+static pthread_t timer_thread_id;
+static volatile struct timespec g_current_time_sp = {};
+static volatile long long g_current_time_in_ms = 0;
+static struct timeval g_current_time_tv;
+static volatile int timer_thread_run = 1;
+
+static const unsigned int INIT_SIP = 0x12345678;
+static const unsigned int INIT_DIP = 0x87654321;
+static const unsigned short INIT_SPORT = 0x1234;
+static const unsigned short INIT_DPORT = 0x4321;
+
+static inline void bm_update_key(char *tuple4_buf, unsigned int index)
+{
+ unsigned int *p_sip = (unsigned int *)&tuple4_buf[1];
+ unsigned int *p_dip = (unsigned int *)&tuple4_buf[5];
+ unsigned short *p_sport = (unsigned short *)&tuple4_buf[9];
+ unsigned short *p_dport = (unsigned short *)&tuple4_buf[11];
+
+ *p_sip = INIT_SIP + index;
+ *p_dip = INIT_DIP + index;
+ *p_sport = INIT_SPORT + index;
+ *p_dport = INIT_DPORT + index;
+}
+
+static unsigned long long max_add_time = 0; // us
+static unsigned long long total_add_time = 0; // us
+static unsigned long long max_search_time = 0; // us
+static unsigned long long total_search_time = 0; // us
+
+// for test, only support one thread
+static struct timespec start_time, end_time;
+
+// return us
+static inline unsigned long long bm_time_diff(const struct timespec *start_time, const struct timespec *end_time)
+{
+ if (start_time->tv_sec == end_time->tv_sec)
+ {
+ return (unsigned long long)(end_time->tv_nsec - start_time->tv_nsec) / 1000;
+ }
+ return ((unsigned long long)end_time->tv_sec * 1000000 + end_time->tv_nsec / 1000) - ((unsigned long long)start_time->tv_sec * 1000000 + start_time->tv_nsec / 1000);
+}
+
+static void bm_add_item(void *bloom_filter, char *tuple4_buf, unsigned int index, const sapp_dup_pkt_t *dup_conf)
+{
+ bm_update_key(tuple4_buf, index);
+ clock_gettime(CLOCK_REALTIME, &start_time);
+ g_current_time_tv.tv_sec = g_current_time_sp.tv_sec;
+ g_current_time_tv.tv_usec = g_current_time_sp.tv_nsec / 1000;
+ bloom_add_partition(bloom_filter, tuple4_buf, TUPLE4_ADDR_LEN, dup_conf, g_current_time_tv, g_current_time_in_ms);
+ clock_gettime(CLOCK_REALTIME, &end_time);
+
+ unsigned long long time_diff = bm_time_diff(&start_time, &end_time);
+ if (time_diff > max_add_time)
+ {
+ max_add_time = time_diff;
+ }
+ total_add_time += (unsigned long long)time_diff;
+}
+
+static int bm_search_item(void *bloom_filter, char *tuple4_buf, unsigned int index, const sapp_dup_pkt_t *dup_conf)
+{
+ bm_update_key(tuple4_buf, index);
+ clock_gettime(CLOCK_REALTIME, &start_time);
+ g_current_time_tv.tv_sec = g_current_time_sp.tv_sec;
+ g_current_time_tv.tv_usec = g_current_time_sp.tv_nsec / 1000;
+ int ret = bloom_check_partition(bloom_filter, tuple4_buf, TUPLE4_ADDR_LEN, dup_conf, g_current_time_tv, g_current_time_in_ms);
+ clock_gettime(CLOCK_REALTIME, &end_time);
+ long long time_diff = bm_time_diff(&start_time, &end_time);
+ if (time_diff > max_search_time)
+ {
+ max_search_time = time_diff;
+ }
+ total_search_time += (unsigned long long)time_diff;
+ return ret;
+}
+
+static int bm_test(int bloom)
+{
+ sapp_dup_pkt_t dup_conf = {};
+ dup_conf.bloom_capacity = BM_CAPACITY;
+ dup_conf.bloom_error_rate = BM_ERROR_RATE;
+ dup_conf.bloom_timeout_ms = BM_TIMEOUT;
+ dup_conf.bloom_library = (enum bloom_library)bloom;
+ dup_conf.kickout_udp_stream_enabled = 1;
+ dup_conf.dup_pkt_distinguish_all_inject = 1;
+ dup_conf.dup_pkt_distinguish_ipv4_tcp = 1;
+ dup_conf.dup_pkt_distinguish_ipv4_udp = 1;
+ dup_conf.bloom_partition_num = BM_PARTITION_NUM;
+ dup_conf.transition_time_ms = BM_TRANSITION_TIME;
+
+ void *dabm_handle = bloom_new_partition(&dup_conf, g_current_time_tv, g_current_time_in_ms);
+ assert(dabm_handle);
+ char tuple4_buf[1024] = {};
+ long long search_y_error_num = 0;
+ long long search_n_error_num = 0;
+ time_t cur_time = time(NULL);
+ long long tot_collision_num = 0;
+
+ tuple4_buf[0] = 'Y';
+ bm_update_key(tuple4_buf, 0);
+
+ bloom_add_partition(dabm_handle, tuple4_buf, TUPLE4_ADDR_LEN, &dup_conf, g_current_time_tv, g_current_time_in_ms);
+ int hit = bloom_check_partition(dabm_handle, tuple4_buf, TUPLE4_ADDR_LEN, &dup_conf, g_current_time_tv, g_current_time_in_ms);
+ if (hit <= 0)
+ {
+ fprintf(stderr, "not found tuple4!\n");
+ bloom_free_partition(dabm_handle, &dup_conf);
+ return -1;
+ }
+
+ printf("starting test, capacity:%u, timeout:%dms, partition:%d, batch_num:%d, max-items:%d\n",
+ BM_CAPACITY, BM_TIMEOUT, BM_PARTITION_NUM, ITEM_BATCH_NUM, MAX_ITEM_NUM);
+ int ret;
+ int add_index = 0, search_index_y = 0, search_index_n = 0;
+ while (add_index < MAX_ITEM_NUM || search_index_y < MAX_ITEM_NUM || search_index_n < MAX_ITEM_NUM)
+ {
+ tuple4_buf[0] = 'Y';
+ for (int b = 0; b < ITEM_BATCH_NUM && add_index < MAX_ITEM_NUM; b++, add_index++)
+ {
+ bm_add_item(dabm_handle, tuple4_buf, add_index, &dup_conf);
+ }
+
+ tuple4_buf[0] = 'Y';
+ for (int b = 0; b < ITEM_BATCH_NUM && search_index_y < MAX_ITEM_NUM; b++, search_index_y++)
+ {
+ ret = bm_search_item(dabm_handle, tuple4_buf, search_index_y, &dup_conf);
+ if (ret <= 0) // expect exist
+ {
+ search_y_error_num++;
+ }
+ }
+
+ tuple4_buf[0] = 'N';
+ for (int b = 0; b < ITEM_BATCH_NUM && search_index_n < MAX_ITEM_NUM; b++, search_index_n++)
+ {
+ ret = bm_search_item(dabm_handle, tuple4_buf, search_index_n, &dup_conf);
+ if (ret > 0) // expect not exist
+ {
+ search_n_error_num++;
+ }
+ }
+ }
+
+ printf("add %lld items success, avg-time:%.3fus, max:%lluus\n",
+ (long long)MAX_ITEM_NUM, (double)total_add_time / (double)MAX_ITEM_NUM, max_add_time);
+
+ printf("search %lld items , avg-time:%.3fus, max:%lluus\n",
+ (long long)MAX_ITEM_NUM, (double)total_search_time / (double)MAX_ITEM_NUM, max_search_time);
+
+ double err_rate = (double)search_n_error_num / (double)MAX_ITEM_NUM;
+ printf("search_y_error_num:%lld \nsearch_n_error_num:%lld, errer-rate:%f\n",
+ search_y_error_num, search_n_error_num, err_rate);
+
+ bloom_free_partition(dabm_handle, &dup_conf);
+
+ return search_y_error_num + (err_rate > (double)BM_ERROR_RATE);
+}
+
+static void *bm_test_thread(void *arg)
+{
+ int bloom = (int)(long)arg;
+ if (bm_test(bloom) != 0)
+ {
+ return (void *)"error";
+ }
+ return (void *)"success";
+}
+
+static inline long long get_curtime_in_us(void)
+{
+ struct timespec curts;
+ clock_gettime(CLOCK_MONOTONIC, &curts);
+ return curts.tv_sec * 1000000 + curts.tv_nsec / 1000;
+}
+
+static inline void nssleep(long ns)
+{
+ struct timespec req = {0, ns};
+ nanosleep(&req, NULL);
+}
+
+static void *timer_thread(void *arg)
+{
+ while (timer_thread_run)
+ {
+ clock_gettime(CLOCK_REALTIME, (struct timespec *)&g_current_time_sp);
+ g_current_time_in_ms = g_current_time_sp.tv_sec * 1000 + g_current_time_sp.tv_nsec / 1000000;
+ nssleep(10);
+ }
+ return NULL;
+}
+
+int sapp_bloom_filter_test_run(int bloom)
+{
+ int ret = 0;
+ pthread_create(&timer_thread_id, NULL, timer_thread, NULL);
+
+ for (int i = 0; i < BM_TEST_MAX_THREAD; i++)
+ {
+ pthread_create(&bm_test_thread_id[i], NULL, bm_test_thread, (void *)(long)bloom);
+ }
+
+ void *thread_result;
+ for (int i = 0; i < BM_TEST_MAX_THREAD; i++)
+ {
+ pthread_join(bm_test_thread_id[i], &thread_result);
+ if (thread_result != "success")
+ {
+ printf("thread %d test failed\n", i);
+ ret = -1;
+ }
+ }
+ timer_thread_run = 0;
+ pthread_cancel(timer_thread_id);
+ pthread_join(timer_thread_id, NULL);
+
+ return ret;
+}
diff --git a/module_test/src/gtest_sapp_fun.h b/module_test/src/gtest_sapp_fun.h index 7d4bab7..b40932c 100644 --- a/module_test/src/gtest_sapp_fun.h +++ b/module_test/src/gtest_sapp_fun.h @@ -569,6 +569,7 @@ void sapp_deal_proxy_kill_tcp_run(void); /**************************** pkt_dump *************************************/ int test_pkt_dump_run(void); +int sapp_bloom_filter_test_run(int); void append_entry_list(const char *entryname); int check_sapp_version(void); diff --git a/module_test/src/gtest_sapp_support.cpp b/module_test/src/gtest_sapp_support.cpp index c58dd81..f4f6a75 100644 --- a/module_test/src/gtest_sapp_support.cpp +++ b/module_test/src/gtest_sapp_support.cpp @@ -501,32 +501,20 @@ void control_platform_opt_run(void) void control_stream_create_timestamp_ms_run(void) { - const char *argv[4]; - - argv[0] = (char *)"./sapp "; - argv[1] = (char *)"--dumpfile"; - argv[2] = (char *)"--dumpfile-speed=timestamp"; - argv[3] = NULL; - set_default_config(); set_project_list_conf_default(); update_config_file("etc/sapp.toml", "syn_mandatory", "0"); update_config_file("etc/sapp.toml", "timeout", "0"); update_config_file_by_lastline("etc/sapp.toml", "packet_io.polling","enabled", "1"); - update_plugin_inf("TCP_ALL", "stream_createtime_ms_tcpall_entry"); - /* - ����һ��pcap��, ʹ�����dz���, ʹ��--dumpfile-speed=timestamp����,��֤ƽ̨��ʱ�䲻�˳�! - �����pcap_onlineģʽ����lo��,�dz�����û������,���²�����ᱻ����. - */ + update_plugin_inf("IP", "iplayer_readjust_time_entry"); + append_plugin_inf("TCP_ALL", "stream_createtime_ms_tcpall_entry"); + set_pcap_dumpfile("tcp/tcp_simple.pcap"); ASSERT_EQ(file_md5_checksum("dumpfile", "df138740a6a22ca9c977052f21f7a470"),0); - printf("test for stream_create_timestamp_ms, set --dumpfile-speed=timestamp please wait for a monment......\n"); - - call_libsapp_devel_with_args(3, argv); - + call_libsapp_devel_for_dumpfile_topspeed(); } diff --git a/module_test/src/gtest_sapp_support_plug.cpp b/module_test/src/gtest_sapp_support_plug.cpp index 19158b1..371d969 100644 --- a/module_test/src/gtest_sapp_support_plug.cpp +++ b/module_test/src/gtest_sapp_support_plug.cpp @@ -1033,8 +1033,6 @@ extern "C" char stream_createtime_ms_tcpall_entry(struct streaminfo *pstream,voi int opt_len = sizeof(long long); if(OP_STATE_PENDING == pstream->pktstate){ - sleep(1); - usleep(123456); ret = MESA_get_stream_opt(pstream, MSO_STREAM_CREATE_TIMESTAMP_MS, &create_time_ms_pending, &opt_len); if(ret < 0){ printf("\033[1;31;40mMESA_get_stream_opt()->MSO_STREAM_CREATE_TIMESTAMP_MS error!\033[0m\n"); @@ -1069,20 +1067,13 @@ extern "C" char stream_createtime_ms_tcpall_entry(struct streaminfo *pstream,voi return APP_STATE_DROPME; } - if(((pstream->ptcpdetail->createtime != create_time_ms_pending/1000) - && (pstream->ptcpdetail->createtime != create_time_ms_pending/1000 - 1))/* 防止在执行的瞬间跨过了一秒 */ - || pstream->ptcpdetail->createtime != create_time_s){ - printf("\033[1;31;40mMESA_get_stream_opt()->MSO_STREAM_CREATE_TIMESTAMP_MS is:%llu, but current time is:%llu\033[0m\n", create_time_ms_pending, pstream->ptcpdetail->createtime); - sendto_test_result(GTEST_SAPP_ERR); - return APP_STATE_DROPME; - } ret = MESA_get_stream_opt(pstream, MSO_STREAM_LASTUPDATE_TIMESTAMP_MS, &lastm_time_ms2, &opt_len); if(ret < 0){ printf("\033[1;31;40mMESA_get_stream_opt()->MSO_STREAM_LASTUPDATE_TIMESTAMP_MS error!\033[0m\n"); sendto_test_result(GTEST_SAPP_ERR); return APP_STATE_DROPME; } - if(lastm_time_ms2 == lastm_time_ms1){ + if(lastm_time_ms2 <= lastm_time_ms1){ printf("\033[1;31;40mMESA_get_stream_opt()->MSO_STREAM_LASTUPDATE_TIMESTAMP_MS pending:%llu diff with close:%llu\033[0m\n", lastm_time_ms1, lastm_time_ms2); sendto_test_result(GTEST_SAPP_ERR); return APP_STATE_DROPME; diff --git a/src/common/sapp_mem.c b/src/common/sapp_mem.c index e921de0..aeb6b36 100644 --- a/src/common/sapp_mem.c +++ b/src/common/sapp_mem.c @@ -49,9 +49,10 @@ static const mem_stat_name_tuple_t mem_stat_name_tuple[] = {SAPP_MEM_DYN_PADDR , "layer_addr"}, {SAPP_MEM_DYN_DETAIN_PKT , "detain_pkt"}, {SAPP_MEM_DYN_SID_LIST , "segment_id_list"}, + {SAPP_MEM_DYN_BLOOM_FILTER , "bloom_filter"}, }; -void *sapp_mem_malloc(sapp_mem_type_t type, int thread_seq, int size) +void *sapp_mem_malloc(sapp_mem_type_t type, int thread_seq, unsigned int size) { char *ptr; sapp_private_mem_t *mhdr; @@ -82,7 +83,7 @@ void *sapp_mem_malloc(sapp_mem_type_t type, int thread_seq, int size) } -void *sapp_mem_calloc(sapp_mem_type_t type, int thread_seq, int size) +void *sapp_mem_calloc(sapp_mem_type_t type, int thread_seq, unsigned int size) { char *ptr; sapp_private_mem_t *mhdr; @@ -137,7 +138,7 @@ void sapp_mem_free(sapp_mem_type_t type, int thread_seq, void *data) free((void *)mhdr); } -void *sapp_mem_realloc(sapp_mem_type_t type, int thread_seq, void *old_ptr, int size) +void *sapp_mem_realloc(sapp_mem_type_t type, int thread_seq, void *old_ptr, unsigned int size) { void *new_ptr; sapp_private_mem_t *mhdr; @@ -258,7 +259,7 @@ void sapp_mem_stat_output(void) } for(stat_index = __SAPP_FIX_DYN_SEPARATOR + 1 ; stat_index < __SAPP_MEM_TYPE_MAX; stat_index++){ - if(dyn_realtime_stat.mem_used_block[stat_index] > 0){ + if(dyn_realtime_stat.mem_used_block[stat_index] != 0){ fprintf(fp, "%7s %18s %18lld %18lld %18s\n", "", mem_stat_name_tuple[stat_index].mem_type_string, @@ -287,6 +288,37 @@ void sapp_mem_stat_output(void) fclose(fp); } +char bloomfilter_mem_stat_polling_entry(struct streaminfo *nouse1, void **nouse2, int thread_seq, void *nouse3) +{ + static time_t last_get_time[SAPP_MAX_THREADS] = {}; + + if(ABBR_CURRENT_TIME <= last_get_time[thread_seq]) + { + return POLLING_STATE_IDLE; + } + + long long blocks = 0, bytes = 0; + bloomfilter_get_mem_stat(thread_seq, &blocks, &bytes); + + sapp_global_val->mthread_volatile[thread_seq]->mem_used_stat.mem_used_block[SAPP_MEM_DYN_BLOOM_FILTER] = blocks; + sapp_global_val->mthread_volatile[thread_seq]->mem_used_stat.mem_used_bytes[SAPP_MEM_DYN_BLOOM_FILTER] = bytes; + + last_get_time[thread_seq] = ABBR_CURRENT_TIME; + return POLLING_STATE_WORK; +} + +int sapp_mem_init(void) +{ + if(sapp_global_val->config.packet_io.dup_pkt_para.kickout_udp_stream_enabled + || sapp_global_val->config.packet_io.dup_pkt_para.dup_pkt_distinguish_ipv4_tcp + || sapp_global_val->config.packet_io.dup_pkt_para.dup_pkt_distinguish_ipv4_udp + || sapp_global_val->config.packet_io.dup_pkt_para.dup_pkt_distinguish_all_inject) + { + stream_register_fun(FUN_TYPE_POLLING, (char (*)(void))bloomfilter_mem_stat_polling_entry, 0); + } + return 0; +} + #ifdef __cplusplus } #endif diff --git a/src/config/config_parse.cpp b/src/config/config_parse.cpp index 6b72ea4..a6d1277 100644 --- a/src/config/config_parse.cpp +++ b/src/config/config_parse.cpp @@ -1319,6 +1319,22 @@ static int config_sanity_check(void) return -1; } + if (pconfig->packet_io.dup_pkt_para.kickout_udp_stream_enabled + || pconfig->packet_io.dup_pkt_para.dup_pkt_distinguish_ipv4_tcp + || pconfig->packet_io.dup_pkt_para.dup_pkt_distinguish_ipv4_udp + || pconfig->packet_io.dup_pkt_para.dup_pkt_distinguish_all_inject) + { + if (pconfig->packet_io.dup_pkt_para.bloom_partition_num <= 0) + { + sapp_log(RLOG_LV_FATAL, ~0, ~0, "bloom_partition_num is:%d, must be greater than 0 !", pconfig->packet_io.dup_pkt_para.bloom_partition_num); + return -1; + } + + if(pconfig->packet_io.dup_pkt_para.transition_time_ms > pconfig->packet_io.dup_pkt_para.bloom_timeout_ms/2){ + sapp_log(RLOG_LV_INFO, ~0, ~0, "bloom_transition_time is greater than bloom_timeout/2\n"); + } + } + /******************************* CPU ********************************/ cur_cpu_num = get_nprocs(); if(cur_cpu_num < pconfig->cpu.worker_threads){ @@ -1735,13 +1751,18 @@ int sapp_parse_config(void) tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.traffic.inject", (char *)"inject_all_enabled", &pconfig->packet_io.dup_pkt_para.dup_pkt_distinguish_all_inject, 0); tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.parameters", (char *)"bloom_capacity", &pconfig->packet_io.dup_pkt_para.bloom_capacity, 1000000); - tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.parameters", (char *)"bloom_timeout", &pconfig->packet_io.dup_pkt_para.bloom_timeout, 60); + int tmp_dm_timeout; + tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.parameters", (char *)"bloom_timeout", &tmp_dm_timeout, 60 * 1000); + pconfig->packet_io.dup_pkt_para.bloom_timeout_ms = (long)tmp_dm_timeout * 1000; + tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.parameters", (char *)"bloom_transition_time", &tmp_dm_timeout, 3); + pconfig->packet_io.dup_pkt_para.transition_time_ms = (long)tmp_dm_timeout * 1000; + tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.parameters", (char *)"bloom_slice_num", &pconfig->packet_io.dup_pkt_para.bloom_slice_num, 3); tomlc99_wrap_load_string_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.parameters", (char *)"bloom_error_rate", str_tmp, sizeof(str_tmp), "0.000001"); tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.parameters", (char *)"bloom_library", (int *)&pconfig->packet_io.dup_pkt_para.bloom_library, 1); tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.parameters", (char *)"first_packets", (int *)&pconfig->packet_io.dup_pkt_para.first_packets, 3); + tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"dup_pkt.parameters", (char *)"bloom_partition_num", (int *)&pconfig->packet_io.dup_pkt_para.bloom_partition_num, 1); pconfig->packet_io.dup_pkt_para.bloom_error_rate = strtod(str_tmp, NULL); - /******************************* packet_io.under_ddos ******************************/ tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"packet_io.under_ddos", (char *)"stream_bypass_enabled", &pconfig->packet_io.under_ddos_config.enabled, 0); //��ǰ����, Ĭ�ϲ����� tomlc99_wrap_load_int_def(ABBR_CFG_FILE_MAIN_ENTRY, (char *)"packet_io.under_ddos", (char *)"get_cpu_usage_interval", &pconfig->packet_io.under_ddos_config.get_cpu_usage_interval, 500); diff --git a/src/dealpkt/duplicate_pkt_distinguish.c b/src/dealpkt/duplicate_pkt_distinguish.c index 1a904b6..fa7a64c 100644 --- a/src/dealpkt/duplicate_pkt_distinguish.c +++ b/src/dealpkt/duplicate_pkt_distinguish.c @@ -12,35 +12,41 @@ extern "C" { 识别因路由策略导致的重复流量, 首次收到数据包加入bloom filter, 每个流的前N个包扫描bloom filter, 重复的数据包直接转发即可. - ip层的ttl, hop不能算为key, 因为经过了几跳路由器又回绕了, ttl肯定会变化, 而上层的数据包确实是重复的. + ip层的ttl, hop不能算为key, 因为经过了几跳路由器又回绕了, ttl肯定会变�?, 而上层的数据包确实是重复�?. */ /* 2021-05-18 lijia close ipv6 protocol: 重复流量识别bloom filter句柄, 根据流量方向和协议的不同, 功能分为三类, - IPv6因为没有Ipid字段, 无法区别真正的应用层重传和重复流量, 开启了会导致断网或者CT, 所以不支持!!! + IPv6因为没有Ipid字段, 无法区别真正的应用层重传和重复流�?, 开启了会导致断网或者CT, 所以不支持!!! - 对于IPv6的两种情况: - 1)如果开启了代理策略, 重传包被识别成了重复包的话, sapp就直接PASS了, 实际那个包是真实通信双方发的, 没有经过tfe处理, 可能就断网了. - 2)如果开启了firewall的drop策略, 第一次drop成功了, 但应用层会重传, 重传包被识别成了重复包的话, sapp就直接PASS了, 导致CT. + 对于IPv6的两种情�?: + 1)如果开启了代理策略, 重传包被识别成了重复包的�?, sapp就直接PASS�?, 实际那个包是真实通信双方发的, 没有经过tfe处理, 可能就断网了. + 2)如果开启了firewall的drop策略, 第一次drop成功�?, 但应用层会重�?, 重传包被识别成了重复包的�?, sapp就直接PASS�?, 导致CT. */ +/* 24.04, split bloomfilter into multiple partition to avoid blocking caused by memset large memory */ +struct bloom_partition +{ + void **bm_handle; + int bm_partition_num; +}; -static inline void *bloom_new(const sapp_dup_pkt_t *dup_conf, struct timeval now) +static inline void *_bloom_new(const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms) { switch(dup_conf->bloom_library) { case BLOOM_LIBRARY_APBLOOM: - return AP_bloom_new(now, dup_conf->bloom_error_rate, dup_conf->bloom_capacity, dup_conf->bloom_timeout*1000, dup_conf->bloom_slice_num); + return AP_bloom_new(now, dup_conf->bloom_error_rate, dup_conf->bloom_capacity, dup_conf->bloom_timeout_ms, dup_conf->bloom_slice_num); case BLOOM_LIBRARY_DABLOOM: - return expiry_dablooms_init(dup_conf->bloom_capacity, dup_conf->bloom_error_rate, now.tv_sec, dup_conf->bloom_timeout); + return expiry_dablooms_init(dup_conf->bloom_capacity, dup_conf->bloom_error_rate, now_ms, dup_conf->bloom_timeout_ms, dup_conf->transition_time_ms); default: return NULL; } } -static inline void bloom_free(void *bloom_filter, const sapp_dup_pkt_t *dup_conf) +static inline void _bloom_free(void *bloom_filter, const sapp_dup_pkt_t *dup_conf) { switch(dup_conf->bloom_library) { @@ -54,7 +60,56 @@ static inline void bloom_free(void *bloom_filter, const sapp_dup_pkt_t *dup_conf return; } -static inline int bloom_check(void *bloom_filter, const char *key, int key_len, const sapp_dup_pkt_t *dup_conf, struct timeval now) +void bloom_free_partition(void *bloom_filter, const sapp_dup_pkt_t *dup_conf) +{ + struct bloom_partition *bm_partition = (struct bloom_partition *)bloom_filter; + int i; + for(i = 0; i < bm_partition->bm_partition_num; i++){ + if(bm_partition->bm_handle[i]){ + _bloom_free(bm_partition->bm_handle[i], dup_conf); + } + } + free(bm_partition); + return; +} + +void *bloom_new_partition(const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms) +{ + int i; + struct bloom_partition *bm_partition = (struct bloom_partition *)calloc(1, sizeof(struct bloom_partition)); + bm_partition->bm_handle = (void **)calloc(dup_conf->bloom_partition_num, sizeof(void *)); + + long base_timeout = dup_conf->bloom_timeout_ms; + int polarity = 1; + sapp_dup_pkt_t local_dup_conf = {}; + memcpy(&local_dup_conf, dup_conf, sizeof(sapp_dup_pkt_t)); + srand(time(NULL)); + for(i = 0; i < dup_conf->bloom_partition_num; i++){ + local_dup_conf.bloom_timeout_ms = base_timeout + (random() % 1000) * polarity; // spread out expire time + polarity *= -1; + if(dup_conf->bloom_timeout_ms < 0){ + local_dup_conf.bloom_timeout_ms = base_timeout; + } + bm_partition->bm_handle[i] = _bloom_new(&local_dup_conf, now, now_ms); + if(NULL == bm_partition->bm_handle[i]){ + break; + } + } + if(i != dup_conf->bloom_partition_num){ + for(i = 0; i < dup_conf->bloom_partition_num; i++){ + if(bm_partition->bm_handle[i]){ + _bloom_free(bm_partition->bm_handle[i], dup_conf); + } + } + free(bm_partition); + return NULL; + } + bm_partition->bm_partition_num = dup_conf->bloom_partition_num; + return bm_partition; +} + + +static inline int _bloom_check(void *bloom_filter, const char *key, int key_len, const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms) { switch(dup_conf->bloom_library) { @@ -63,13 +118,29 @@ static inline int bloom_check(void *bloom_filter, const char *key, int key_len, now, (const char *)key, key_len); case BLOOM_LIBRARY_DABLOOM: return expiry_dablooms_search((struct expiry_dablooms_handle *)bloom_filter, - (const char *)key, key_len, (time_t)now.tv_sec); + (const char *)key, key_len, now_ms); default: return 0; } } -static inline void bloom_add(void *bloom_filter, const char *key, int key_len, const sapp_dup_pkt_t *dup_conf, struct timeval now) +static inline unsigned int bloom_parttion_kiss_hash(const unsigned char *key, int key_len) +{ + unsigned int hash = 127; + for(int i = 0; i < key_len; i++){ + hash += (unsigned int )key[i]; + } + return (hash & 0x7ffffff); +} + +int bloom_check_partition(void *bloom_filter, const char *key, int key_len, const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms) +{ + struct bloom_partition *bm_partition = (struct bloom_partition *)bloom_filter; + int index = bloom_parttion_kiss_hash((unsigned char *)key, key_len) % bm_partition->bm_partition_num; + return _bloom_check(bm_partition->bm_handle[index], key, key_len, dup_conf, now, now_ms); +} + +static inline void _bloom_add(void *bloom_filter, const char *key, int key_len, const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms) { switch(dup_conf->bloom_library) { @@ -78,7 +149,7 @@ static inline void bloom_add(void *bloom_filter, const char *key, int key_len, c now, (const char *)key, key_len); case BLOOM_LIBRARY_DABLOOM: expiry_dablooms_add((struct expiry_dablooms_handle *)bloom_filter, - (const char *)key, key_len, (time_t)now.tv_sec); + (const char *)key, key_len, now_ms); break; default: break; @@ -86,6 +157,52 @@ static inline void bloom_add(void *bloom_filter, const char *key, int key_len, c return; } +void bloom_add_partition(void *bloom_filter, const char *key, int key_len, const sapp_dup_pkt_t *dup_conf, struct timeval now, long now_ms) +{ + struct bloom_partition *bm_partition = (struct bloom_partition *)bloom_filter; + int index = bloom_parttion_kiss_hash((unsigned char *)key, key_len) % bm_partition->bm_partition_num; + return _bloom_add(bm_partition->bm_handle[index], key, key_len, dup_conf, now, now_ms); +} + +int bloomfilter_get_mem_stat(int tseq, long long *mem_blocks, long long *mem_bytes) +{ + struct bloom_partition *bm_partition = (struct bloom_partition *)sapp_global_val->mthread_volatile[tseq]->dup_pkt_distinguish_handle; + + if(NULL == bm_partition){ + *mem_blocks = 0; + *mem_bytes = 0; + return 0; + } + + switch (sapp_global_val->config.packet_io.dup_pkt_para.bloom_library) + { + case BLOOM_LIBRARY_APBLOOM: + // todo, + *mem_blocks = 0; + *mem_bytes = 0; + return -1; + break; + case BLOOM_LIBRARY_DABLOOM: + { + long long tmp_blocks = 0; + long long tmp_bytes = 0; + *mem_blocks = 0; + *mem_bytes = 0; + for (int i = 0; i < bm_partition->bm_partition_num; i++) + { + expiry_dablooms_get_mem_stat(bm_partition->bm_handle[i], &tmp_blocks, &tmp_bytes); + *mem_blocks += tmp_blocks; + *mem_bytes += tmp_bytes; + } + } + break; + default: + break; + } + + return 0; +} + int sapp_dup_stream_search(struct streaminfo *a_stream) { void *key=NULL; @@ -102,7 +219,7 @@ int sapp_dup_stream_search(struct streaminfo *a_stream) } struct timeval now={g_CurrentTime, 0}; - return bloom_check(sapp_global_val->mthread_volatile[a_stream->threadnum]->dup_pkt_distinguish_handle, key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now); + return bloom_check_partition(sapp_global_val->mthread_volatile[a_stream->threadnum]->dup_pkt_distinguish_handle, key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now, g_current_time_ms); } @@ -121,7 +238,7 @@ void sapp_dup_stream_add(struct streaminfo *a_stream) key_len=sizeof(struct stream_tuple4_v6); } struct timeval now={g_CurrentTime, 0}; - bloom_add(sapp_global_val->mthread_volatile[a_stream->threadnum]->dup_pkt_distinguish_handle, key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now); + bloom_add_partition(sapp_global_val->mthread_volatile[a_stream->threadnum]->dup_pkt_distinguish_handle, key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now, g_current_time_ms); } static inline int sapp_set_dup_pkt_key(enum addr_type_t addr_type, enum stream_type_t stream_type, const void *iphdr, @@ -141,8 +258,8 @@ static inline int sapp_set_dup_pkt_key(enum addr_type_t addr_type, enum stream_t if (stream_type == STREAM_TYPE_UDP) { const struct mesa_udp_hdr *uhdr = (const struct mesa_udp_hdr *)l4_hdr; - l4->tcp_seq = 0x5A5A5A5A; /* udp没有, 但设成0可能对某些hash函数不友好, 此处设个固定值 */ - l4->tcp_ack = 0xA5A5A5A5; /* udp没有, 但设成0可能对某些hash函数不友好, 此处设个固定值 */ + l4->tcp_seq = 0x5A5A5A5A; /* udp没有, 但设�?0可能对某些hash函数不友�?, 此处设个固定�? */ + l4->tcp_ack = 0xA5A5A5A5; /* udp没有, 但设�?0可能对某些hash函数不友�?, 此处设个固定�? */ l4->sport = uhdr->uh_sport; l4->dport = uhdr->uh_dport; l4->checksum = uhdr->uh_sum; @@ -182,12 +299,12 @@ int sapp_dup_pkt_identify(int tid, struct streaminfo_private *pstream_pr, struct timeval now={g_CurrentTime, 0}; size_t key_len = sapp_set_dup_pkt_key((enum addr_type_t)pstream_pr->stream_public.addr.addrtype, (enum stream_type_t)pstream_pr->stream_public.type, ip_hdr, (void *)l4_hdr, &dup_bloom_key); - is_dup_pkt= bloom_check(sapp_global_val->mthread_volatile[tid]->dup_pkt_distinguish_handle, (const char*)&dup_bloom_key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now); + is_dup_pkt= bloom_check_partition(sapp_global_val->mthread_volatile[tid]->dup_pkt_distinguish_handle, (const char*)&dup_bloom_key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now, g_current_time_ms); if(is_dup_pkt != 0){ pstream_pr->has_duplicate_pkt = 1; }else{ if(need_add_bloom_filter){ - bloom_add(sapp_global_val->mthread_volatile[tid]->dup_pkt_distinguish_handle, (const char*)&dup_bloom_key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now); + bloom_add_partition(sapp_global_val->mthread_volatile[tid]->dup_pkt_distinguish_handle, (const char*)&dup_bloom_key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now, g_current_time_ms); } } return is_dup_pkt; @@ -200,7 +317,7 @@ void sapp_dup_pkt_mark_l4(struct streaminfo *a_stream, const void *ip_hdr, const int ret = 0; struct timeval now={g_CurrentTime, 0}; key_len = sapp_set_dup_pkt_key((enum addr_type_t)a_stream->addr.addrtype, (enum stream_type_t)a_stream->type, ip_hdr, l4_hdr, &dup_bloom_key); - bloom_add(sapp_global_val->mthread_volatile[tid]->dup_pkt_distinguish_handle, (const char*)&dup_bloom_key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now); + bloom_add_partition(sapp_global_val->mthread_volatile[tid]->dup_pkt_distinguish_handle, (const char*)&dup_bloom_key, key_len, &sapp_global_val->config.packet_io.dup_pkt_para, now, g_current_time_ms); return; } @@ -216,12 +333,12 @@ int sapp_dup_pkt_init(void) return 0; } - /* 流量入口有三个开关, 识别句柄只有一个, - 拆分句柄多耗费内存, 且判断时每个包需要扫描多个句柄, + /* 流量入口有三个开�?, 识别句柄只有一�?, + 拆分句柄多耗费内存, 且判断时每个包需要扫描多个句�?, 可能好处只是准确率高了一点点而已 */ struct timeval now={time(NULL), 0}; for(i = 0; i < sapp_global_val->config.cpu.worker_threads; i++){ - sapp_global_val->mthread_volatile[i]->dup_pkt_distinguish_handle = bloom_new(p_dup_conf, now); + sapp_global_val->mthread_volatile[i]->dup_pkt_distinguish_handle = bloom_new_partition(p_dup_conf, now, g_current_time_ms); if(NULL == sapp_global_val->mthread_volatile[i]->dup_pkt_distinguish_handle){ sapp_log(RLOG_LV_FATAL, 1, 1, "dup_pkt_distinguish init error!"); return -1; @@ -238,7 +355,7 @@ void sapp_dup_pkt_destroy(void) const sapp_dup_pkt_t *p_dup_conf = &sapp_global_val->config.packet_io.dup_pkt_para; for(i = 0; i < sapp_global_val->config.cpu.worker_threads; i++){ if(sapp_global_val->mthread_volatile[i]->dup_pkt_distinguish_handle){ - bloom_free(sapp_global_val->mthread_volatile[i]->dup_pkt_distinguish_handle,p_dup_conf); + bloom_free_partition(sapp_global_val->mthread_volatile[i]->dup_pkt_distinguish_handle,p_dup_conf); } } diff --git a/src/sapp_dev/sapp_init.c b/src/sapp_dev/sapp_init.c index f6ef015..5c1d247 100644 --- a/src/sapp_dev/sapp_init.c +++ b/src/sapp_dev/sapp_init.c @@ -303,6 +303,8 @@ int MESA_platform_init(int argc, char *argv[]) sapp_set_current_state(SAPP_STATE_PKT_IO_INITED); + sapp_mem_init(); + sapp_runtime_log(RLOG_LV_DEBUG, "sapp platform init success"); return 0; diff --git a/src/support/ap_bloom/src/ap_bloom.c b/src/support/ap_bloom/src/ap_bloom.c index 79cf47d..4064e3e 100644 --- a/src/support/ap_bloom/src/ap_bloom.c +++ b/src/support/ap_bloom/src/ap_bloom.c @@ -158,12 +158,14 @@ struct ap_state int consecutive_matches; int visited_slices; int hash_num; + int counter; UT_array slice_time_events; }; void ap_state_init(struct ap_state *state, int hash_num) { state->consecutive_matches=0; state->visited_slices=0; + state->counter=0; state->hash_num=hash_num; utarray_init(&state->slice_time_events, &slice_event_icd); utarray_reserve(&state->slice_time_events, hash_num*2); @@ -172,6 +174,7 @@ void ap_state_clear(struct ap_state *state) { state->consecutive_matches=0; state->visited_slices=0; + state->counter=0; utarray_clear(&state->slice_time_events); } void ap_state_done(struct ap_state *state) @@ -188,6 +191,7 @@ int ap_state_is_match(struct ap_state *state) { return 1; } + if(state->counter >= state->hash_num)return 1; utarray_sort(&state->slice_time_events, slice_event_cmp); while((ev=(struct ap_slice_event*)utarray_next(&state->slice_time_events, ev))) { @@ -221,6 +225,10 @@ static void ap_slice_chain_check_hash(const struct ap_slice *head, struct double ev.hash_index = slice->hash_index; utarray_push_back(&state->slice_time_events, &ev); found=1; + if(timercmp(&slice->last_insert, &slice->first_insert, >)) + { + state->counter++; + } } state->visited_slices++; } diff --git a/src/support/dablooms/CMakeLists.txt b/src/support/dablooms/CMakeLists.txt index f7e8ecf..58369a1 100644 --- a/src/support/dablooms/CMakeLists.txt +++ b/src/support/dablooms/CMakeLists.txt @@ -1,4 +1,4 @@ cmake_minimum_required(VERSION 2.8...3.10) add_definitions(-fPIC) -add_library(libdabloom STATIC src/dablooms.c src/murmur.c)
\ No newline at end of file +add_library(libdabloom STATIC src/dablooms.c src/murmur.c src/mem_util.c)
\ No newline at end of file diff --git a/src/support/dablooms/src/dablooms.c b/src/support/dablooms/src/dablooms.c index 9f52a26..fbdcd31 100755 --- a/src/support/dablooms/src/dablooms.c +++ b/src/support/dablooms/src/dablooms.c @@ -12,36 +12,39 @@ #include <unistd.h> #include <errno.h> #include <time.h> +#include <assert.h> #include "murmur.h" #include "dablooms.h" +#include "mem_util.h" -#define DABLOOMS_VERSION "0.9.1" +#define DABLOOMS_VERSION "1.0.1" #define ERROR_TIGHTENING_RATIO 0.5 #define SALT_CONSTANT 0x97c29b3a -#define ALLOC(type, number) ((type *)calloc(sizeof(type), number)) -#define FREE(p) {free(p);p=NULL;} +#define ALLOC(type, number, mstat) ((type *)dabm_calloc(number, sizeof(type), mstat)) +#define FREE(p, mstat) dabm_free(p, mstat) +#define REALLOC(p, size, mstat) dabm_realloc(p, size, mstat) const char *dablooms_version(void) { return DABLOOMS_VERSION; } -void free_bitmap(bitmap_t *bitmap) +static void free_bitmap(bitmap_t *bitmap, struct dabm_mem_stat *mstat) { #if 0 if ((munmap(bitmap->array, bitmap->bytes)) < 0) { perror("Error, unmapping memory"); } #else - FREE(bitmap->array); + FREE(bitmap->array, mstat); #endif - FREE(bitmap); + FREE(bitmap, mstat); } -bitmap_t *bitmap_resize(bitmap_t *bitmap, size_t old_size, size_t new_size) +static bitmap_t *bitmap_resize(bitmap_t *bitmap, size_t old_size, size_t new_size, struct dabm_mem_stat *mstat) { #if 0 @@ -74,18 +77,20 @@ bitmap_t *bitmap_resize(bitmap_t *bitmap, size_t old_size, size_t new_size) #else if (bitmap->array != NULL) { - bitmap->array = (char *)realloc(bitmap->array, new_size); - if (bitmap->array == NULL) - { - perror("Error resizing memory"); - free_bitmap(bitmap); - return NULL; - } + bitmap->array = (char *)REALLOC(bitmap->array, new_size, mstat); + assert(bitmap->array); + // if (bitmap->array == NULL) + // { + // perror("Error resizing memory"); + // free_bitmap(bitmap, mstat); + // return NULL; + // } memset(bitmap->array + old_size, 0, new_size - old_size); - } + //printf("bitmap_resize(), %p, new:%d, old:%d\n", bitmap,(int)new_size, (int)old_size); + } else { - bitmap->array = ALLOC(char, new_size); + bitmap->array = ALLOC(char, new_size, mstat); } #endif bitmap->bytes = new_size; @@ -94,22 +99,22 @@ bitmap_t *bitmap_resize(bitmap_t *bitmap, size_t old_size, size_t new_size) /* Create a new bitmap, not full featured, simple to give * us a means of interacting with the 4 bit counters */ -bitmap_t *new_bitmap(size_t bytes) +static bitmap_t *new_bitmap(size_t bytes, struct dabm_mem_stat *mstat) { bitmap_t *bitmap; - bitmap = ALLOC(bitmap_t, 1); + bitmap = ALLOC(bitmap_t, 1, mstat); bitmap->bytes = bytes; - if ((bitmap = bitmap_resize(bitmap, 0, bytes)) == NULL) { + if ((bitmap = bitmap_resize(bitmap, 0, bytes, mstat)) == NULL) { return NULL; } return bitmap; } -int bitmap_increment(bitmap_t *bitmap, unsigned int index, long offset) +static inline int bitmap_increment(bitmap_t *bitmap, unsigned int index, long offset) { long access = index / 2 + offset; uint8_t temp; @@ -158,7 +163,7 @@ int bitmap_decrement(bitmap_t *bitmap, unsigned int index, long offset) } /* decrements the four bit counter */ -int bitmap_check(bitmap_t *bitmap, unsigned int index, long offset) +static inline int bitmap_check(const bitmap_t *bitmap, unsigned int index, long offset) { long access = index / 2 + offset; if (index % 2 != 0 ) { @@ -168,19 +173,19 @@ int bitmap_check(bitmap_t *bitmap, unsigned int index, long offset) } } -int bitmap_flush(bitmap_t *bitmap) -{ -#if 0 - if ((msync(bitmap->array, bitmap->bytes, MS_SYNC) < 0)) { - perror("Error, flushing bitmap to disk"); - return -1; - } else { - return 0; - } -#else - return 0; -#endif -} +// int bitmap_flush(bitmap_t *bitmap) +// { +// #if 0 +// if ((msync(bitmap->array, bitmap->bytes, MS_SYNC) < 0)) { +// perror("Error, flushing bitmap to disk"); +// return -1; +// } else { +// return 0; +// } +// #else +// return 0; +// #endif +// } /* * Perform the actual hashing for `key` @@ -191,36 +196,34 @@ int bitmap_flush(bitmap_t *bitmap) * See paper by Kirsch, Mitzenmacher [2006] * http://www.eecs.harvard.edu/~michaelm/postscripts/rsa2008.pdf */ -void hash_func(counting_bloom_t *bloom, const char *key, size_t key_len, uint32_t *hashes) +static inline void hash_func(uint32_t *hashes, const uint32_t checksum[4], size_t nfuncs, uint32_t counts_per_func) { - uint32_t checksum[4]; - - MurmurHash3_x64_128(key, key_len, SALT_CONSTANT, checksum); + // uint32_t checksum[4]; + // MurmurHash3_x64_128(key, key_len, SALT_CONSTANT, checksum); uint32_t h1 = checksum[0]; uint32_t h2 = checksum[1]; - - for (size_t i = 0; i < bloom->nfuncs; i++) { - hashes[i] = (h1 + i * h2) % bloom->counts_per_func; + for (size_t i = 0; i < nfuncs; i++) { + hashes[i] = (h1 + i * h2) % counts_per_func; } } -int free_counting_bloom(counting_bloom_t *bloom) -{ - if (bloom != NULL) { - FREE(bloom->hashes); - bloom->hashes = NULL; - free_bitmap(bloom->bitmap); - FREE(bloom); - bloom = NULL; - } - return 0; -} +// int free_counting_bloom(counting_bloom_t *bloom) +// { +// if (bloom != NULL) { +// FREE(bloom->hashes); +// bloom->hashes = NULL; +// free_bitmap(bloom->bitmap); +// FREE(bloom); +// bloom = NULL; +// } +// return 0; +// } -counting_bloom_t *counting_bloom_init(unsigned int capacity, double error_rate, long offset) +counting_bloom_t *counting_bloom_init(unsigned int capacity, double error_rate, long offset, struct dabm_mem_stat *mstat) { counting_bloom_t *bloom; - bloom = ALLOC(counting_bloom_t, 1); + bloom = ALLOC(counting_bloom_t, 1, mstat); bloom->bitmap = NULL; bloom->capacity = capacity; bloom->error_rate = error_rate; @@ -230,30 +233,32 @@ counting_bloom_t *counting_bloom_init(unsigned int capacity, double error_rate, bloom->size = bloom->nfuncs * bloom->counts_per_func; /* rounding-up integer divide by 2 of bloom->size */ bloom->num_bytes = ((bloom->size + 1) / 2) + sizeof(counting_bloom_header_t); - bloom->hashes = ALLOC(uint32_t, bloom->nfuncs); + bloom->hashes = ALLOC(uint32_t, bloom->nfuncs, mstat); return bloom; } -counting_bloom_t *new_counting_bloom(unsigned int capacity, double error_rate) +static counting_bloom_t *new_counting_bloom(unsigned int capacity, double error_rate, struct dabm_mem_stat *mstat) { counting_bloom_t *cur_bloom; - cur_bloom = counting_bloom_init(capacity, error_rate, 0); - cur_bloom->bitmap = new_bitmap(cur_bloom->num_bytes); + cur_bloom = counting_bloom_init(capacity, error_rate, 0, mstat); + cur_bloom->bitmap = new_bitmap(cur_bloom->num_bytes, mstat); cur_bloom->header = (counting_bloom_header_t *)(cur_bloom->bitmap->array); return cur_bloom; } -int counting_bloom_add(counting_bloom_t *bloom, const char *s, size_t len) +static int counting_bloom_add(counting_bloom_t *bloom, const uint32_t checksum[4]) { unsigned int index, i, offset; unsigned int *hashes = bloom->hashes; + size_t nfuncs = bloom->nfuncs; + uint32_t counts_per_func = bloom->counts_per_func; + + hash_func(hashes, checksum, nfuncs, counts_per_func); - hash_func(bloom, s, len, hashes); - - for (i = 0; i < bloom->nfuncs; i++) { - offset = i * bloom->counts_per_func; + for (i = 0; i < nfuncs; i++) { + offset = i * counts_per_func; index = hashes[i] + offset; bitmap_increment(bloom->bitmap, index, bloom->offset); } @@ -262,32 +267,34 @@ int counting_bloom_add(counting_bloom_t *bloom, const char *s, size_t len) return 0; } -int counting_bloom_remove(counting_bloom_t *bloom, const char *s, size_t len) -{ - unsigned int index, i, offset; - unsigned int *hashes = bloom->hashes; +// int counting_bloom_remove(counting_bloom_t *bloom, const char *s, size_t len) +// { +// unsigned int index, i, offset; +// unsigned int *hashes = bloom->hashes; - hash_func(bloom, s, len, hashes); +// hash_func(bloom, s, len, hashes); - for (i = 0; i < bloom->nfuncs; i++) { - offset = i * bloom->counts_per_func; - index = hashes[i] + offset; - bitmap_decrement(bloom->bitmap, index, bloom->offset); - } - bloom->header->count--; +// for (i = 0; i < bloom->nfuncs; i++) { +// offset = i * bloom->counts_per_func; +// index = hashes[i] + offset; +// bitmap_decrement(bloom->bitmap, index, bloom->offset); +// } +// bloom->header->count--; - return 0; -} +// return 0; +// } -int counting_bloom_check(counting_bloom_t *bloom, const char *s, size_t len) +static int counting_bloom_check(const counting_bloom_t *bloom, const uint32_t checksum[4]) { unsigned int index, i, offset; unsigned int *hashes = bloom->hashes; + size_t nfuncs = bloom->nfuncs; + uint32_t counts_per_func = bloom->counts_per_func; + + hash_func(hashes, checksum, nfuncs, counts_per_func); - hash_func(bloom, s, len, hashes); - - for (i = 0; i < bloom->nfuncs; i++) { - offset = i * bloom->counts_per_func; + for (i = 0; i < nfuncs; i++) { + offset = i * counts_per_func; index = hashes[i] + offset; if (!(bitmap_check(bloom->bitmap, index, bloom->offset))) { return 0; @@ -296,39 +303,40 @@ int counting_bloom_check(counting_bloom_t *bloom, const char *s, size_t len) return 1; } -int free_scaling_bloom(scaling_bloom_t *bloom) +static int free_scaling_bloom(scaling_bloom_t *bloom, struct dabm_mem_stat *mstat) { int i; for (i = bloom->num_blooms - 1; i >= 0; i--) { - FREE(bloom->blooms[i]->hashes); + FREE(bloom->blooms[i]->hashes, mstat); bloom->blooms[i]->hashes = NULL; - FREE(bloom->blooms[i]); + FREE(bloom->blooms[i], mstat); bloom->blooms[i] = NULL; } - FREE(bloom->blooms); - free_bitmap(bloom->bitmap); - FREE(bloom); + FREE(bloom->blooms, mstat); + free_bitmap(bloom->bitmap, mstat); + FREE(bloom, mstat); return 0; } /* creates a new counting bloom filter from a given scaling bloom filter, with count and id */ -counting_bloom_t *new_counting_bloom_from_scale(scaling_bloom_t *bloom) +counting_bloom_t *new_counting_bloom_from_scale(scaling_bloom_t *bloom, struct dabm_mem_stat *mstat) { long offset; double error_rate; counting_bloom_t *cur_bloom; error_rate = bloom->error_rate * (pow(ERROR_TIGHTENING_RATIO, bloom->num_blooms + 1)); - - if ((bloom->blooms = (counting_bloom_t **)realloc(bloom->blooms, (bloom->num_blooms + 1) * sizeof(counting_bloom_t *))) == NULL) { - fprintf(stderr, "Error, could not realloc a new bloom filter\n"); - return NULL; - } - - cur_bloom = counting_bloom_init(bloom->capacity, error_rate, bloom->num_bytes); + bloom->blooms = (counting_bloom_t **)REALLOC(bloom->blooms, (bloom->num_blooms + 1) * sizeof(counting_bloom_t *), mstat); + assert(bloom->blooms); + // if (unlikely(bloom->blooms == NULL)) { + // fprintf(stderr, "Error, could not realloc a new bloom filter\n"); + // return NULL; + // } + + cur_bloom = counting_bloom_init(bloom->capacity, error_rate, bloom->num_bytes ,mstat); bloom->blooms[bloom->num_blooms] = cur_bloom; - bloom->bitmap = bitmap_resize(bloom->bitmap, bloom->num_bytes, bloom->num_bytes + cur_bloom->num_bytes); + bloom->bitmap = bitmap_resize(bloom->bitmap, bloom->num_bytes, bloom->num_bytes + cur_bloom->num_bytes, mstat); /* reset header pointer, as mmap may have moved */ bloom->header = (scaling_bloom_header_t *) bloom->bitmap->array; @@ -351,17 +359,18 @@ uint64_t scaling_bloom_clear_seqnums(scaling_bloom_t *bloom) { uint64_t seqnum; - if (bloom->header->disk_seqnum != 0) { - // disk_seqnum cleared on disk before any other changes - bloom->header->disk_seqnum = 0; - bitmap_flush(bloom->bitmap); - } + // if (bloom->header->disk_seqnum != 0) { + // // disk_seqnum cleared on disk before any other changes + // bloom->header->disk_seqnum = 0; + // // bitmap_flush(bloom->bitmap); + // } + bloom->header->disk_seqnum = 0; seqnum = bloom->header->mem_seqnum; bloom->header->mem_seqnum = 0; return seqnum; } -int scaling_bloom_add(scaling_bloom_t *bloom, const char *s, size_t len, uint64_t id) +static int scaling_bloom_add(scaling_bloom_t *bloom, const uint32_t checksum[4], uint64_t id, struct dabm_mem_stat *mstat) { int i; uint64_t seqnum; @@ -377,84 +386,84 @@ int scaling_bloom_add(scaling_bloom_t *bloom, const char *s, size_t len, uint64_ seqnum = scaling_bloom_clear_seqnums(bloom); if ((id > bloom->header->max_id) && (cur_bloom->header->count >= cur_bloom->capacity - 1)) { - cur_bloom = new_counting_bloom_from_scale(bloom); + cur_bloom = new_counting_bloom_from_scale(bloom, mstat); cur_bloom->header->count = 0; cur_bloom->header->id = bloom->header->max_id + 1; } if (bloom->header->max_id < id) { bloom->header->max_id = id; } - counting_bloom_add(cur_bloom, s, len); + counting_bloom_add(cur_bloom, checksum); bloom->header->mem_seqnum = seqnum + 1; return 1; } -int scaling_bloom_remove(scaling_bloom_t *bloom, const char *s, size_t len, uint64_t id) -{ - counting_bloom_t *cur_bloom; - int i; - uint64_t seqnum; +// int scaling_bloom_remove(scaling_bloom_t *bloom, const char *s, size_t len, uint64_t id) +// { +// counting_bloom_t *cur_bloom; +// int i; +// uint64_t seqnum; - for (i = bloom->num_blooms - 1; i >= 0; i--) { - cur_bloom = bloom->blooms[i]; - if (id >= cur_bloom->header->id) { - seqnum = scaling_bloom_clear_seqnums(bloom); +// for (i = bloom->num_blooms - 1; i >= 0; i--) { +// cur_bloom = bloom->blooms[i]; +// if (id >= cur_bloom->header->id) { +// seqnum = scaling_bloom_clear_seqnums(bloom); - counting_bloom_remove(cur_bloom, s, len); +// counting_bloom_remove(cur_bloom, s, len); - bloom->header->mem_seqnum = seqnum + 1; - return 1; - } - } - return 0; -} - -int scaling_bloom_check(scaling_bloom_t *bloom, const char *s, size_t len) +// bloom->header->mem_seqnum = seqnum + 1; +// return 1; +// } +// } +// return 0; +// } + +static int scaling_bloom_check(scaling_bloom_t *bloom, const uint32_t checksum[4]) { int i; counting_bloom_t *cur_bloom; for (i = bloom->num_blooms - 1; i >= 0; i--) { cur_bloom = bloom->blooms[i]; - if (counting_bloom_check(cur_bloom, s, len)) { + if (counting_bloom_check(cur_bloom, checksum)) { return 1; } } return 0; } -int scaling_bloom_flush(scaling_bloom_t *bloom) -{ - if (bitmap_flush(bloom->bitmap) != 0) { - return -1; - } - // all changes written to disk before disk_seqnum set - if (bloom->header->disk_seqnum == 0) { - bloom->header->disk_seqnum = bloom->header->mem_seqnum; - return bitmap_flush(bloom->bitmap); - } - return 0; -} - -uint64_t scaling_bloom_mem_seqnum(scaling_bloom_t *bloom) -{ - return bloom->header->mem_seqnum; -} - -uint64_t scaling_bloom_disk_seqnum(scaling_bloom_t *bloom) -{ - return bloom->header->disk_seqnum; -} - -scaling_bloom_t *scaling_bloom_init(unsigned int capacity, double error_rate) +// int scaling_bloom_flush(scaling_bloom_t *bloom) +// { +// if (bitmap_flush(bloom->bitmap) != 0) { +// return -1; +// } +// // all changes written to disk before disk_seqnum set +// if (bloom->header->disk_seqnum == 0) { +// bloom->header->disk_seqnum = bloom->header->mem_seqnum; +// return bitmap_flush(bloom->bitmap); +// } +// return 0; +// } + +// uint64_t scaling_bloom_mem_seqnum(scaling_bloom_t *bloom) +// { +// return bloom->header->mem_seqnum; +// } + +// uint64_t scaling_bloom_disk_seqnum(scaling_bloom_t *bloom) +// { +// return bloom->header->disk_seqnum; +// } + +scaling_bloom_t *scaling_bloom_init(unsigned int capacity, double error_rate,struct dabm_mem_stat *mstat) { scaling_bloom_t *bloom; - bloom = ALLOC(scaling_bloom_t, 1); - if ((bloom->bitmap = new_bitmap(sizeof(scaling_bloom_header_t))) == NULL) { + bloom = ALLOC(scaling_bloom_t, 1, mstat); + if ((bloom->bitmap = new_bitmap(sizeof(scaling_bloom_header_t), mstat)) == NULL) { fprintf(stderr, "Error, Could not create bitmap with file\n"); - free_scaling_bloom(bloom); + free_scaling_bloom(bloom, mstat); return NULL; } @@ -468,19 +477,20 @@ scaling_bloom_t *scaling_bloom_init(unsigned int capacity, double error_rate) return bloom; } -scaling_bloom_t *new_scaling_bloom(unsigned int capacity, double error_rate) +static scaling_bloom_t *new_scaling_bloom(unsigned int capacity, double error_rate, struct dabm_mem_stat *mstat) { scaling_bloom_t *bloom; counting_bloom_t *cur_bloom; - bloom = scaling_bloom_init(capacity, error_rate); - - if (!(cur_bloom = new_counting_bloom_from_scale(bloom))) { - fprintf(stderr, "Error, Could not create counting bloom\n"); - free_scaling_bloom(bloom); - return NULL; - } + bloom = scaling_bloom_init(capacity, error_rate, mstat); + cur_bloom = new_counting_bloom_from_scale(bloom, mstat); + // if (!()) { + // fprintf(stderr, "Error, Could not create counting bloom\n"); + // free_scaling_bloom(bloom, mstat); + // return NULL; + // } + assert(cur_bloom); cur_bloom->header->count = 0; cur_bloom->header->id = 0; @@ -492,15 +502,17 @@ scaling_bloom_t *new_scaling_bloom(unsigned int capacity, double error_rate) struct expiry_dablooms_handle{ scaling_bloom_t *cur_bloom; scaling_bloom_t *next_bloom; - time_t cur_bloom_start; - time_t next_bloom_start; - time_t last_bloom_check; + long cur_bloom_start_ms; + long next_bloom_start_ms; + long last_bloom_check_ms; uint64_t cur_bloom_inc_id; uint64_t next_bloom_inc_id; unsigned int capacity; - int expiry_time; - time_t cur_time; + long expiry_time_ms; + long cur_time_ms; + long transition_time_ms; double error_rate; + struct dabm_mem_stat mstat; }; char* expiry_dablooms_errno_trans(enum expiry_dablooms_errno _errno){ @@ -517,28 +529,29 @@ char* expiry_dablooms_errno_trans(enum expiry_dablooms_errno _errno){ void expiry_dablooms_destroy(struct expiry_dablooms_handle *handle){ if(handle != NULL){ if(handle->cur_bloom != NULL){ - free_scaling_bloom(handle->cur_bloom); + free_scaling_bloom(handle->cur_bloom, &handle->mstat); } if(handle->next_bloom != NULL){ - free_scaling_bloom(handle->next_bloom); + free_scaling_bloom(handle->next_bloom, &handle->mstat); } - FREE(handle); + free(handle); } } -struct expiry_dablooms_handle* expiry_dablooms_init(unsigned int capacity, double error_rate, time_t cur_time, int expiry_time){ - struct expiry_dablooms_handle *handle = ALLOC(struct expiry_dablooms_handle, 1); - scaling_bloom_t *cur_bloom = new_scaling_bloom(capacity, error_rate); +struct expiry_dablooms_handle* expiry_dablooms_init(unsigned int capacity, double error_rate, long cur_time_ms, long expiry_time_ms, long transition_time_ms){ + struct expiry_dablooms_handle *handle = (struct expiry_dablooms_handle *)calloc(1, sizeof(struct expiry_dablooms_handle)); + scaling_bloom_t *cur_bloom = new_scaling_bloom(capacity, error_rate, &handle->mstat); if(cur_bloom == NULL){ goto error_out; } handle->cur_bloom = cur_bloom; handle->cur_bloom_inc_id = 0; - handle->cur_bloom_start=cur_time; + handle->cur_bloom_start_ms=cur_time_ms; handle->capacity = capacity; handle->error_rate = error_rate; - handle->expiry_time = expiry_time; - handle->cur_time = cur_time; + handle->expiry_time_ms = expiry_time_ms; + handle->cur_time_ms = cur_time_ms; + handle->transition_time_ms = transition_time_ms; return handle; error_out: @@ -546,91 +559,123 @@ error_out: return NULL; } -int expiry_dablooms_element_count_get(struct expiry_dablooms_handle *handle, uint64_t *count){ - if(handle == NULL || handle->cur_bloom == NULL){ - return EXPIRY_DABLOOMS_ERRNO_BLOOM_NULL; +// int expiry_dablooms_element_count_get(struct expiry_dablooms_handle *handle, uint64_t *count){ +// if(handle == NULL || handle->cur_bloom == NULL){ +// return EXPIRY_DABLOOMS_ERRNO_BLOOM_NULL; +// } +// *count = handle->cur_bloom_inc_id; +// return 0; +// } + +static inline int bloom_in_transition_period(const struct expiry_dablooms_handle *handle, long cur_time_ms) +{ + if((cur_time_ms - handle->cur_bloom_start_ms) + handle->transition_time_ms < handle->expiry_time_ms){ + return 0; } - *count = handle->cur_bloom_inc_id; - return 0; + return 1; } -static int bloom_expired_check(struct expiry_dablooms_handle *handle, time_t cur_time){ - if(handle == NULL || handle->cur_bloom == NULL){ - return EXPIRY_DABLOOMS_ERRNO_BLOOM_NULL; - } - if(cur_time <= handle->last_bloom_check){ +static int bloom_expired_check(struct expiry_dablooms_handle *handle, long cur_time_ms, struct dabm_mem_stat *mstat){ + // if(unlikely(handle == NULL || handle->cur_bloom == NULL)){ + // return EXPIRY_DABLOOMS_ERRNO_BLOOM_NULL; + // } + if(likely(cur_time_ms <= handle->last_bloom_check_ms + 1000)){ return 0; } - time_t delta_time = cur_time - handle->cur_bloom_start; - handle->cur_time=cur_time; - if(delta_time >= handle->expiry_time){ - free_scaling_bloom(handle->cur_bloom); + long delta_time_ms = cur_time_ms - handle->cur_bloom_start_ms; + handle->cur_time_ms=cur_time_ms; + if(unlikely(delta_time_ms >= handle->expiry_time_ms)){ + free_scaling_bloom(handle->cur_bloom, mstat); if(handle->next_bloom != NULL){ handle->cur_bloom = handle->next_bloom; - handle->cur_bloom_start = handle->next_bloom_start; + handle->cur_bloom_start_ms = handle->next_bloom_start_ms; handle->cur_bloom_inc_id = handle->next_bloom_inc_id; handle->next_bloom = NULL; - handle->last_bloom_check=0; + handle->last_bloom_check_ms=0; } else{ - scaling_bloom_t *cur_bloom = new_scaling_bloom(handle->capacity, handle->error_rate); - if(cur_bloom == NULL){ + scaling_bloom_t *cur_bloom = new_scaling_bloom(handle->capacity, handle->error_rate, mstat); + if(unlikely(cur_bloom == NULL)){ return EXPIRY_DABLOOMS_ERRNO_NEW_BLOOM_FAIL; } handle->cur_bloom = cur_bloom; handle->cur_bloom_inc_id = 0; - handle->cur_bloom_start=cur_time; - handle->last_bloom_check=0; + handle->cur_bloom_start_ms=cur_time_ms; + handle->last_bloom_check_ms=0; } } else { - handle->last_bloom_check=cur_time; + handle->last_bloom_check_ms=cur_time_ms; } return 0; } -int expiry_dablooms_add(struct expiry_dablooms_handle *handle, const char *key, size_t len, time_t cur_time){ - if(key==NULL || len ==0 || handle == NULL) - { - return -1; - } - int ret = bloom_expired_check(handle, cur_time); - if(ret < 0) +int expiry_dablooms_add(struct expiry_dablooms_handle *handle, const char *key, size_t len, long cur_time_ms){ + assert(!(key==NULL || len ==0 || handle == NULL)); + int ret = bloom_expired_check(handle, cur_time_ms, &handle->mstat); + if(unlikely(ret < 0)) { return ret; } + __builtin_prefetch(key, 0, 0); + uint32_t checksum[4]; + MurmurHash3_x64_128(key, len, SALT_CONSTANT, checksum); + + long delta_time_ms = cur_time_ms - handle->cur_bloom_start_ms; + handle->cur_time_ms=cur_time_ms; - scaling_bloom_add(handle->cur_bloom, key, len, handle->cur_bloom_inc_id); - handle->cur_bloom_inc_id++; - time_t delta_time = cur_time - handle->cur_bloom_start; - handle->cur_time=cur_time; - if(delta_time >= handle->expiry_time){ - if(handle->next_bloom == NULL){ - scaling_bloom_t *next_bloom = new_scaling_bloom(handle->capacity, handle->error_rate); + int is_transition = bloom_in_transition_period(handle, cur_time_ms); + if(unlikely(is_transition != 0)){ + if(unlikely(handle->next_bloom == NULL)){ + scaling_bloom_t *next_bloom = new_scaling_bloom(handle->capacity, handle->error_rate, &handle->mstat); if(next_bloom == NULL){ return EXPIRY_DABLOOMS_ERRNO_NEW_BLOOM_FAIL; } handle->next_bloom = next_bloom; handle->next_bloom_inc_id = 0; - handle->next_bloom_start=cur_time; + handle->next_bloom_start_ms=cur_time_ms; } - scaling_bloom_add(handle->next_bloom, key, len, handle->next_bloom_inc_id); + scaling_bloom_add(handle->next_bloom, checksum, handle->next_bloom_inc_id, &handle->mstat); handle->next_bloom_inc_id++; + }else{ + scaling_bloom_add(handle->cur_bloom, checksum, handle->cur_bloom_inc_id, &handle->mstat); + handle->cur_bloom_inc_id++; } + return 0; } -int expiry_dablooms_search(struct expiry_dablooms_handle *handle, const char *key, size_t len, time_t cur_time){ - if(key==NULL || len ==0 || handle == NULL) - { - return -1; - } - int ret = bloom_expired_check(handle, cur_time); - if(ret < 0) +int expiry_dablooms_search(struct expiry_dablooms_handle *handle, const char *key, size_t len, long cur_time_ms){ + assert(!(key==NULL || len ==0 || handle == NULL)); + int ret = bloom_expired_check(handle, cur_time_ms, &handle->mstat); + if(unlikely(ret < 0)) { return ret; } - int bloom_hit = scaling_bloom_check(handle->cur_bloom, key, len); - return bloom_hit; + + __builtin_prefetch(key, 0, 0); + uint32_t checksum[4]; + MurmurHash3_x64_128(key, len, SALT_CONSTANT, checksum); + + int bloom_hit_cur = 0; + int bloom_hit_next = 0; + int is_transition = bloom_in_transition_period(handle, cur_time_ms); + + if(unlikely(is_transition != 0) && (handle->next_bloom != NULL)){ + bloom_hit_next = scaling_bloom_check(handle->next_bloom, checksum); + } + + if(likely(0 == bloom_hit_next)){ + bloom_hit_cur = scaling_bloom_check(handle->cur_bloom, checksum); + } + + return bloom_hit_cur || bloom_hit_next; } + +int expiry_dablooms_get_mem_stat(struct expiry_dablooms_handle *handle, long long *blocks, long long *bytes) +{ + *blocks = handle->mstat.blocks; + *bytes = handle->mstat.bytes; + return 0; +}
\ No newline at end of file diff --git a/src/support/dablooms/src/dablooms.h b/src/support/dablooms/src/dablooms.h index 017fb53..f93a9a0 100755 --- a/src/support/dablooms/src/dablooms.h +++ b/src/support/dablooms/src/dablooms.h @@ -5,6 +5,18 @@ #include <stdint.h> #include <stdlib.h> +#ifdef __cplusplus +extern "C" { +#endif + +#ifndef likely +#define likely(x) __builtin_expect(!!(x), 1) +#endif + +#ifndef unlikely +#define unlikely(x) __builtin_expect(!!(x), 0) +#endif + const char *dablooms_version(void); typedef struct { @@ -13,15 +25,15 @@ typedef struct { } bitmap_t; -bitmap_t *bitmap_resize(bitmap_t *bitmap, size_t old_size, size_t new_size); -bitmap_t *new_bitmap(size_t bytes); +// bitmap_t *bitmap_resize(bitmap_t *bitmap, size_t old_size, size_t new_size); +// bitmap_t *new_bitmap(size_t bytes); -int bitmap_increment(bitmap_t *bitmap, unsigned int index, long offset); +// int bitmap_increment(bitmap_t *bitmap, unsigned int index, long offset); int bitmap_decrement(bitmap_t *bitmap, unsigned int index, long offset); -int bitmap_check(bitmap_t *bitmap, unsigned int index, long offset); +// int bitmap_check(bitmap_t *bitmap, unsigned int index, long offset); int bitmap_flush(bitmap_t *bitmap); -void free_bitmap(bitmap_t *bitmap); +// void free_bitmap(bitmap_t *bitmap); typedef struct { uint64_t id; @@ -43,11 +55,11 @@ typedef struct { bitmap_t *bitmap; } counting_bloom_t; -int free_counting_bloom(counting_bloom_t *bloom); -counting_bloom_t *new_counting_bloom(unsigned int capacity, double error_rate); -int counting_bloom_add(counting_bloom_t *bloom, const char *s, size_t len); +// int free_counting_bloom(counting_bloom_t *bloom); +// counting_bloom_t *new_counting_bloom(unsigned int capacity, double error_rate); +// int counting_bloom_add(counting_bloom_t *bloom, const char *s, size_t len); int counting_bloom_remove(counting_bloom_t *bloom, const char *s, size_t len); -int counting_bloom_check(counting_bloom_t *bloom, const char *s, size_t len); +// int counting_bloom_check(counting_bloom_t *bloom, const char *s, size_t len); typedef struct { uint64_t max_id; @@ -65,11 +77,11 @@ typedef struct { bitmap_t *bitmap; } scaling_bloom_t; -scaling_bloom_t *new_scaling_bloom(unsigned int capacity, double error_rate); -int free_scaling_bloom(scaling_bloom_t *bloom); -int scaling_bloom_add(scaling_bloom_t *bloom, const char *s, size_t len, uint64_t id); +// scaling_bloom_t *new_scaling_bloom(unsigned int capacity, double error_rate); +// int free_scaling_bloom(scaling_bloom_t *bloom); +// int scaling_bloom_add(scaling_bloom_t *bloom, const char *s, size_t len, uint64_t id); int scaling_bloom_remove(scaling_bloom_t *bloom, const char *s, size_t len, uint64_t id); -int scaling_bloom_check(scaling_bloom_t *bloom, const char *s, size_t len); +// int scaling_bloom_check(scaling_bloom_t *bloom, const char *s, size_t len); int scaling_bloom_flush(scaling_bloom_t *bloom); uint64_t scaling_bloom_mem_seqnum(scaling_bloom_t *bloom); uint64_t scaling_bloom_disk_seqnum(scaling_bloom_t *bloom); @@ -81,9 +93,13 @@ enum expiry_dablooms_errno{ }; char* expiry_dablooms_errno_trans(enum expiry_dablooms_errno _errno); void expiry_dablooms_destroy(struct expiry_dablooms_handle *handle); -struct expiry_dablooms_handle* expiry_dablooms_init(unsigned int capacity, double error_rate, time_t cur_time, int expiry_time); +struct expiry_dablooms_handle* expiry_dablooms_init(unsigned int capacity, double error_rate, long cur_time_ms, long expiry_time_ms, long transition_time_ms); int expiry_dablooms_element_count_get(struct expiry_dablooms_handle *handle, uint64_t *count); int expiry_dablooms_add(struct expiry_dablooms_handle *handle, const char *key, size_t len, time_t cur_time); int expiry_dablooms_search(struct expiry_dablooms_handle *handle, const char *key, size_t len, time_t cur_time); +int expiry_dablooms_get_mem_stat(struct expiry_dablooms_handle *handle, long long *blocks, long long *bytes); +#ifdef __cplusplus +} +#endif #endif diff --git a/src/support/dablooms/src/mem_util.c b/src/support/dablooms/src/mem_util.c new file mode 100644 index 0000000..0978018 --- /dev/null +++ b/src/support/dablooms/src/mem_util.c @@ -0,0 +1,101 @@ +#include <sys/stat.h> +#include <stdint.h> +#include <stdio.h> +#include <stdarg.h> +#include <stdlib.h> +#include <fcntl.h> +#include <math.h> +#include <string.h> +#include <sys/mman.h> +#include <unistd.h> +#include <errno.h> +#include <time.h> +#include <pthread.h> +#include "murmur.h" +#include "dablooms.h" +#include "mem_util.h" + +struct dabm_mem_hdr{ + size_t size; + char *user_ptr; +}; + +void *dabm_malloc(size_t user_size, struct dabm_mem_stat *mstat) +{ + size_t real_size = user_size + sizeof(struct dabm_mem_hdr); + char *real_ptr = (char *)malloc(real_size); + if (real_ptr) + { + struct dabm_mem_hdr *stat = (struct dabm_mem_hdr *)real_ptr; + stat->size = real_size; + stat->user_ptr = real_ptr + sizeof(struct dabm_mem_hdr); + mstat->blocks++; + mstat->bytes += real_size; + return stat->user_ptr; + } + return NULL; +} + +void *dabm_calloc(size_t nmemb, size_t user_size, struct dabm_mem_stat *mstat) +{ + size_t real_size = nmemb * user_size + sizeof(struct dabm_mem_hdr); + char *real_ptr = (char *)calloc(1, real_size); + if (real_ptr) + { + struct dabm_mem_hdr *stat = (struct dabm_mem_hdr *)real_ptr; + stat->size = real_size; + stat->user_ptr = real_ptr + sizeof(struct dabm_mem_hdr); + mstat->blocks++; + mstat->bytes += real_size; + return stat->user_ptr; + } + return NULL; +} + +void dabm_free(void *user_ptr, struct dabm_mem_stat *mstat) +{ + if (user_ptr) + { + struct dabm_mem_hdr *stat = (struct dabm_mem_hdr *)((char *)user_ptr - sizeof(struct dabm_mem_hdr)); + mstat->blocks--; + mstat->bytes -= stat->size; + free(stat); + } +} + +void *dabm_realloc(void *user_ptr, size_t new_size, struct dabm_mem_stat *mstat) +{ + char *old_real_ptr; + struct dabm_mem_hdr *stat; + + if (user_ptr) + { + stat = (struct dabm_mem_hdr *)((char *)user_ptr - sizeof(struct dabm_mem_hdr)); + old_real_ptr = (char *)stat; + mstat->bytes -= stat->size; + } + else + { + old_real_ptr = NULL; + mstat->blocks++; // no memory alloc before + } + + size_t real_size = new_size + sizeof(struct dabm_mem_hdr); + char *new_real_ptr = (char *)realloc(old_real_ptr, real_size); + if (new_real_ptr) + { + mstat->bytes += real_size; + stat = (struct dabm_mem_hdr *)new_real_ptr; + stat->size = real_size; + stat->user_ptr = new_real_ptr + sizeof(struct dabm_mem_hdr); + return stat->user_ptr; + } + + return NULL; +} + +int dabm_get_mem_hdr_size(void) +{ + return sizeof(struct dabm_mem_hdr); +} + diff --git a/src/support/dablooms/src/mem_util.h b/src/support/dablooms/src/mem_util.h new file mode 100644 index 0000000..f038392 --- /dev/null +++ b/src/support/dablooms/src/mem_util.h @@ -0,0 +1,22 @@ +#ifndef _DABM_MEM_UTIL_H_ +#define _DABM_MEM_UTIL_H_ 1 + +#ifdef __cplusplus +extern "C" +{ +#endif + struct dabm_mem_stat + { + long blocks; + long bytes; + }; + + void *dabm_malloc(size_t size, struct dabm_mem_stat *mstat); + void *dabm_calloc(size_t nmemb, size_t size, struct dabm_mem_stat *mstat); + void dabm_free(void *user_ptr, struct dabm_mem_stat *mstat); + void *dabm_realloc(void *ptr, size_t new_size, struct dabm_mem_stat *mstat); + int dabm_get_mem_hdr_size(void); +#ifdef __cplusplus +} +#endif +#endif
\ No newline at end of file diff --git a/src/support/dablooms/src/test_mem_util.c b/src/support/dablooms/src/test_mem_util.c new file mode 100644 index 0000000..f8176d1 --- /dev/null +++ b/src/support/dablooms/src/test_mem_util.c @@ -0,0 +1,63 @@ +#include <sys/stat.h> +#include <stdint.h> +#include <stdio.h> +#include <stdarg.h> +#include <stdlib.h> +#include <fcntl.h> +#include <math.h> +#include <string.h> +#include <sys/mman.h> +#include <unistd.h> +#include <errno.h> +#include <assert.h> +#include <time.h> +#include "murmur.h" +#include "dablooms.h" +#include "mem_util.h" + +int main(int argc, char const *argv[]) +{ + int i; + struct dabm_mem_stat mstat = {}; + + char *ptr = (char *)dabm_malloc(100, &mstat); + assert(ptr != NULL); + + assert(mstat.blocks == 1); + assert(mstat.bytes == 100 + dabm_get_mem_hdr_size()); + dabm_free(ptr, &mstat); + assert(mstat.blocks == 0); + assert(mstat.bytes == 0); + + int *iptr = (int *)dabm_calloc(100, sizeof(int), &mstat); + assert(iptr != NULL); + for (i = 0; i < 100; i++) + { + assert(iptr[i] == 0); + } + assert(mstat.blocks == 1); + assert(mstat.bytes == 100 + dabm_get_mem_hdr_size()); + dabm_free(iptr, &mstat); + + ptr = dabm_realloc(NULL, 100, &mstat); + assert(ptr != NULL); + + assert(mstat.blocks == 1); + assert(mstat.bytes == 100 + dabm_get_mem_hdr_size()); + memset(ptr, 0xAA, 100); + + unsigned char *uptr = (unsigned char *)dabm_realloc(ptr, 1000, &mstat); + for (i = 0; i < 100; i++) + { + assert(uptr[i] == 0xAA); + } + assert(mstat.blocks == 1); + assert(mstat.bytes == 100 + dabm_get_mem_hdr_size()); + + dabm_free(uptr, &mstat); + assert(mstat.blocks == 0); + assert(mstat.bytes == 0); + + printf("test mem util success!\n"); + return 0; +} |
