diff options
| author | luwenpeng <[email protected]> | 2024-10-23 10:01:20 +0800 |
|---|---|---|
| committer | luwenpeng <[email protected]> | 2024-10-21 10:49:41 +0800 |
| commit | fd3cc20554cba6fe7ee7c671730079f81a2fbc5d (patch) | |
| tree | e38e5405a47fd5dff8c422d4b2109de99159ec4d /infra/session_manager | |
| parent | a7b79a0e227eb509699d0a864129e5013eff50fe (diff) | |
feature: packet IO support IP reassembly
Diffstat (limited to 'infra/session_manager')
| -rw-r--r-- | infra/session_manager/session_internal.h | 7 | ||||
| -rw-r--r-- | infra/session_manager/session_manager.c | 32 | ||||
| -rw-r--r-- | infra/session_manager/session_manager_runtime.c | 99 | ||||
| -rw-r--r-- | infra/session_manager/session_pool.c | 39 | ||||
| -rw-r--r-- | infra/session_manager/session_pool.h | 4 | ||||
| -rw-r--r-- | infra/session_manager/session_table.c | 2 | ||||
| -rw-r--r-- | infra/session_manager/session_utils.c | 2 | ||||
| -rw-r--r-- | infra/session_manager/test/gtest_overload_evict_tcp_sess.cpp | 2 | ||||
| -rw-r--r-- | infra/session_manager/test/gtest_overload_evict_udp_sess.cpp | 2 | ||||
| -rw-r--r-- | infra/session_manager/test/gtest_session_pool.cpp | 50 |
10 files changed, 121 insertions, 118 deletions
diff --git a/infra/session_manager/session_internal.h b/infra/session_manager/session_internal.h index 0ee8373..b22e484 100644 --- a/infra/session_manager/session_internal.h +++ b/infra/session_manager/session_internal.h @@ -14,15 +14,13 @@ extern "C" #include "stellar/session.h" #include "tcp_reassembly.h" -#define EX_DATA_MAX_COUNT 4 - // output format: "${src_addr}:${src_port}-${dst_addr}:${dst_port}-${ip_proto}-${domain}" // output max len: (46 + 1 + 5) + 1 + (46 + 1 + 5) + 1 + 1 + 1 + 20 = 129 #define TUPLE6_STR_SIZE 130 struct tcp_half { - struct tcp_reassembly *assembler; + struct tcp_reassembly *tcp_reass; struct tcp_segment in_order; // current packet in order segment uint32_t in_order_ref; // reference count of current packet in order segment @@ -62,7 +60,6 @@ struct session struct route_ctx route_ctx[MAX_FLOW_TYPE]; const struct packet *first_pkt[MAX_FLOW_TYPE]; const struct packet *curr_pkt; - void *ex_data[EX_DATA_MAX_COUNT]; void *user_data; int is_symmetric; int dup; @@ -75,7 +72,7 @@ struct session struct session_manager_stat *sess_mgr_stat; }; -TAILQ_HEAD(session_list, session); +TAILQ_HEAD(session_queue, session); void session_init(struct session *sess); diff --git a/infra/session_manager/session_manager.c b/infra/session_manager/session_manager.c index ecc55f6..4b2aed9 100644 --- a/infra/session_manager/session_manager.c +++ b/infra/session_manager/session_manager.c @@ -6,7 +6,7 @@ #include "stellar/session_manager.h" #include "stellar/module_manager.h" -#include "utils.h" +#include "utils_internal.h" #include "session_internal.h" #include "session_manager_runtime.h" @@ -15,7 +15,6 @@ #pragma GCC diagnostic ignored "-Wunused-parameter" #pragma GCC diagnostic ignored "-Wunused-function" - struct session_manager_schema { struct exdata_schema *exdata; @@ -133,6 +132,10 @@ fast_path: static void on_packet_output(enum packet_stage stage, struct packet *pkt, void *args) { struct session_manager *sess_mgr = (struct session_manager *)args; + struct stellar_module_manager *mod_mgr = sess_mgr->mod_mgr; + int thread_id = stellar_module_manager_get_thread_id(mod_mgr); + struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id]; + struct session *sess = (struct session *)packet_get_exdata(pkt, sess_mgr->schema->pkt_exdata_idx); if (sess) { @@ -157,11 +160,17 @@ static void on_packet_output(enum packet_stage stage, struct packet *pkt, void * session_set_current_packet(sess, NULL); session_set_flow_type(sess, FLOW_TYPE_NONE); } + + if (packet_get_origin(pkt) == NULL) + { + session_manager_runtime_record_duplicated_packet(sess_mgr_rt, pkt); + } } static void clean_session(struct session_manager_runtime *sess_mgr_rt, uint64_t now_ms) { #define MAX_CLEANED_SESS 1024 + char buffer[4096] = {0}; struct session *sess = NULL; struct session *cleaned_sess[MAX_CLEANED_SESS] = {NULL}; @@ -169,6 +178,16 @@ static void clean_session(struct session_manager_runtime *sess_mgr_rt, uint64_t for (uint64_t j = 0; j < used; j++) { sess = cleaned_sess[j]; + + session_to_str(sess, 0, buffer, sizeof(buffer)); + SESSION_MANAGER_LOG_INFO("session free: %s", buffer); + + // TODO publish session free msg + // TODO mq_runtime_dispatch_immediate() + + struct exdata_runtime *exdata_rt = (struct exdata_runtime *)session_get_user_data(sess); + exdata_runtime_free(exdata_rt); + session_manager_runtime_free_session(sess_mgr_rt, sess); } } @@ -236,8 +255,6 @@ struct session_manager_schema *session_manager_schema_new(struct packet_manager goto error_out; } - // TODO register polling - sess_mgr_schema->mq = mq; sess_mgr_schema->pkt_exdata_idx = packet_manager_new_packet_exdata_index(pkt_mgr, "session_manager", NULL, NULL); if (sess_mgr_schema->pkt_exdata_idx == -1) @@ -316,19 +333,16 @@ void session_manager_free(struct session_manager *sess_mgr) struct session_manager *session_manager_new(struct stellar_module_manager *mod_mgr, struct packet_manager *pkt_mgr, struct mq_schema *mq_schema, const char *toml_file) { - assert(pkt_mgr); - assert(mq_schema); - assert(toml_file); uint64_t thread_num; uint64_t instance_id; uint64_t now_ms = clock_get_real_time_ms(); - if (load_and_validate_toml_integer_config(toml_file, "instance.id", (uint64_t *)&instance_id, 0, 4095)) + if (load_toml_integer_config(toml_file, "instance.id", (uint64_t *)&instance_id, 0, 4095)) { return NULL; } - if (load_and_validate_toml_integer_config(toml_file, "packet_io.nr_worker_thread", (uint64_t *)&thread_num, 0, MAX_THREAD_NUM)) + if (load_toml_integer_config(toml_file, "packet_io.thread_num", (uint64_t *)&thread_num, 0, MAX_THREAD_NUM)) { return NULL; } diff --git a/infra/session_manager/session_manager_runtime.c b/infra/session_manager/session_manager_runtime.c index efb9e42..b2aa250 100644 --- a/infra/session_manager/session_manager_runtime.c +++ b/infra/session_manager/session_manager_runtime.c @@ -3,7 +3,7 @@ #include <assert.h> #include <errno.h> -#include "utils.h" +#include "utils_internal.h" #include "packet_helper.h" #include "packet_filter.h" #include "session_internal.h" @@ -28,7 +28,7 @@ struct snowflake struct session_manager_runtime { - struct session_list evicte_list; + struct session_queue evicte_list; struct session_pool *sess_pool; struct session_timer *sess_timer; struct session_table *tcp_sess_table; @@ -167,30 +167,30 @@ static uint64_t snowflake_generate(struct snowflake *sf, uint64_t now_sec) static void tcp_clean(struct session_manager_runtime *sess_mgr_rt, struct session *sess) { - struct tcp_reassembly *c2s_ssembler = sess->tcp_halfs[FLOW_TYPE_C2S].assembler; - struct tcp_reassembly *s2c_ssembler = sess->tcp_halfs[FLOW_TYPE_S2C].assembler; + struct tcp_reassembly *c2s_tcp_reass = sess->tcp_halfs[FLOW_TYPE_C2S].tcp_reass; + struct tcp_reassembly *s2c_tcp_reass = sess->tcp_halfs[FLOW_TYPE_S2C].tcp_reass; struct tcp_segment *seg; - if (c2s_ssembler) + if (c2s_tcp_reass) { - while ((seg = tcp_reassembly_expire(c2s_ssembler, UINT64_MAX))) + while ((seg = tcp_reassembly_expire(c2s_tcp_reass, UINT64_MAX))) { session_inc_stat(sess, FLOW_TYPE_C2S, STAT_TCP_SEGMENTS_RELEASED, 1); session_inc_stat(sess, FLOW_TYPE_C2S, STAT_TCP_PAYLOADS_RELEASED, seg->len); sess_mgr_rt->stat.tcp_segs_freed++; tcp_segment_free(seg); } - tcp_reassembly_free(c2s_ssembler); + tcp_reassembly_free(c2s_tcp_reass); } - if (s2c_ssembler) + if (s2c_tcp_reass) { - while ((seg = tcp_reassembly_expire(s2c_ssembler, UINT64_MAX))) + while ((seg = tcp_reassembly_expire(s2c_tcp_reass, UINT64_MAX))) { session_inc_stat(sess, FLOW_TYPE_S2C, STAT_TCP_SEGMENTS_RELEASED, 1); session_inc_stat(sess, FLOW_TYPE_S2C, STAT_TCP_PAYLOADS_RELEASED, seg->len); sess_mgr_rt->stat.tcp_segs_freed++; tcp_segment_free(seg); } - tcp_reassembly_free(s2c_ssembler); + tcp_reassembly_free(s2c_tcp_reass); } } @@ -201,18 +201,18 @@ static int tcp_init(struct session_manager_runtime *sess_mgr_rt, struct session return 0; } - sess->tcp_halfs[FLOW_TYPE_C2S].assembler = tcp_reassembly_new(sess_mgr_rt->cfg.tcp_reassembly.timeout_ms, sess_mgr_rt->cfg.tcp_reassembly.buffered_segments_max); - sess->tcp_halfs[FLOW_TYPE_S2C].assembler = tcp_reassembly_new(sess_mgr_rt->cfg.tcp_reassembly.timeout_ms, sess_mgr_rt->cfg.tcp_reassembly.buffered_segments_max); - if (sess->tcp_halfs[FLOW_TYPE_C2S].assembler == NULL || sess->tcp_halfs[FLOW_TYPE_S2C].assembler == NULL) + sess->tcp_halfs[FLOW_TYPE_C2S].tcp_reass = tcp_reassembly_new(sess_mgr_rt->cfg.tcp_reassembly.timeout_ms, sess_mgr_rt->cfg.tcp_reassembly.buffered_segments_max); + sess->tcp_halfs[FLOW_TYPE_S2C].tcp_reass = tcp_reassembly_new(sess_mgr_rt->cfg.tcp_reassembly.timeout_ms, sess_mgr_rt->cfg.tcp_reassembly.buffered_segments_max); + if (sess->tcp_halfs[FLOW_TYPE_C2S].tcp_reass == NULL || sess->tcp_halfs[FLOW_TYPE_S2C].tcp_reass == NULL) { tcp_clean(sess_mgr_rt, sess); return -1; } - SESSION_MANAGER_LOG_DEBUG("session %lu %s new c2s tcp assembler %p, s2c tcp assembler %p", + SESSION_MANAGER_LOG_DEBUG("session %lu %s new c2s tcp tcp_reass %p, s2c tcp tcp_reass %p", session_get_id(sess), session_get0_readable_addr(sess), - sess->tcp_halfs[FLOW_TYPE_C2S].assembler, - sess->tcp_halfs[FLOW_TYPE_S2C].assembler); + sess->tcp_halfs[FLOW_TYPE_C2S].tcp_reass, + sess->tcp_halfs[FLOW_TYPE_S2C].tcp_reass); return 0; } @@ -257,10 +257,10 @@ static void tcp_update(struct session_manager_runtime *sess_mgr_rt, struct sessi if (unlikely(flags & TH_SYN)) { // len > 0 is SYN with data (TCP Fast Open) - tcp_reassembly_set_recv_next(half->assembler, len ? half->seq : half->seq + 1); + tcp_reassembly_set_recv_next(half->tcp_reass, len ? half->seq : half->seq + 1); } - seg = tcp_reassembly_expire(half->assembler, sess_mgr_rt->now_ms); + seg = tcp_reassembly_expire(half->tcp_reass, sess_mgr_rt->now_ms); if (seg) { session_inc_stat(sess, type, STAT_TCP_SEGMENTS_EXPIRED, 1); @@ -280,7 +280,7 @@ static void tcp_update(struct session_manager_runtime *sess_mgr_rt, struct sessi session_inc_stat(sess, type, STAT_TCP_PAYLOADS_RECEIVED, len); sess_mgr_rt->stat.tcp_segs_input++; - uint32_t rcv_nxt = tcp_reassembly_get_recv_next(half->assembler); + uint32_t rcv_nxt = tcp_reassembly_get_recv_next(half->tcp_reass); // in order if (half->seq == rcv_nxt) { @@ -291,7 +291,7 @@ static void tcp_update(struct session_manager_runtime *sess_mgr_rt, struct sessi half->in_order.data = tcp_layer->pld_ptr; half->in_order.len = len; half->in_order_ref = 0; - tcp_reassembly_inc_recv_next(half->assembler, len); + tcp_reassembly_inc_recv_next(half->tcp_reass, len); } // retransmission else if (uint32_before(uint32_add(half->seq, len), rcv_nxt)) @@ -302,7 +302,7 @@ static void tcp_update(struct session_manager_runtime *sess_mgr_rt, struct sessi } else if ((seg = tcp_segment_new(half->seq, tcp_layer->pld_ptr, len))) { - switch (tcp_reassembly_push(half->assembler, seg, sess_mgr_rt->now_ms)) + switch (tcp_reassembly_push(half->tcp_reass, seg, sess_mgr_rt->now_ms)) { case -2: session_inc_stat(sess, type, STAT_TCP_SEGMENTS_RETRANSMIT, 1); @@ -481,39 +481,39 @@ struct session_manager_config *session_manager_config_new(const char *toml_file) } int ret = 0; - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_session_max", (uint64_t *)&sess_mgr_cfg->tcp_session_max, EVICTE_SESSION_BURST * 2, UINT64_MAX); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.udp_session_max", (uint64_t *)&sess_mgr_cfg->udp_session_max, EVICTE_SESSION_BURST * 2, UINT64_MAX); + ret += load_toml_integer_config(toml_file, "session_manager.tcp_session_max", (uint64_t *)&sess_mgr_cfg->tcp_session_max, EVICTE_SESSION_BURST * 2, UINT64_MAX); + ret += load_toml_integer_config(toml_file, "session_manager.udp_session_max", (uint64_t *)&sess_mgr_cfg->udp_session_max, EVICTE_SESSION_BURST * 2, UINT64_MAX); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evict_old_on_tcp_table_limit", (uint64_t *)&sess_mgr_cfg->evict_old_on_tcp_table_limit, 0, 1); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evict_old_on_udp_table_limit", (uint64_t *)&sess_mgr_cfg->evict_old_on_udp_table_limit, 0, 1); + ret += load_toml_integer_config(toml_file, "session_manager.evict_old_on_tcp_table_limit", (uint64_t *)&sess_mgr_cfg->evict_old_on_tcp_table_limit, 0, 1); + ret += load_toml_integer_config(toml_file, "session_manager.evict_old_on_udp_table_limit", (uint64_t *)&sess_mgr_cfg->evict_old_on_udp_table_limit, 0, 1); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.expire_period_ms", (uint64_t *)&sess_mgr_cfg->expire_period_ms, 0, 60000); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.expire_batch_max", (uint64_t *)&sess_mgr_cfg->expire_batch_max, 1, 1024); + ret += load_toml_integer_config(toml_file, "session_manager.expire_period_ms", (uint64_t *)&sess_mgr_cfg->expire_period_ms, 0, 60000); + ret += load_toml_integer_config(toml_file, "session_manager.expire_batch_max", (uint64_t *)&sess_mgr_cfg->expire_batch_max, 1, 1024); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.init", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.init, 1, 60000); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.handshake", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.handshake, 1, 60000); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.data", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.data, 1, 15999999000); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.half_closed", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.half_closed, 1, 604800000); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.time_wait", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.time_wait, 1, 60000); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.discard_default", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.discard_default, 1, 15999999000); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.unverified_rst", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.unverified_rst, 1, 60000); + ret += load_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.init", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.init, 1, 60000); + ret += load_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.handshake", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.handshake, 1, 60000); + ret += load_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.data", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.data, 1, 15999999000); + ret += load_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.half_closed", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.half_closed, 1, 604800000); + ret += load_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.time_wait", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.time_wait, 1, 60000); + ret += load_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.discard_default", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.discard_default, 1, 15999999000); + ret += load_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.unverified_rst", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.unverified_rst, 1, 60000); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.udp_timeout_ms.data", (uint64_t *)&sess_mgr_cfg->udp_timeout_ms.data, 1, 15999999000); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.udp_timeout_ms.discard_default", (uint64_t *)&sess_mgr_cfg->udp_timeout_ms.discard_default, 1, 15999999000); + ret += load_toml_integer_config(toml_file, "session_manager.udp_timeout_ms.data", (uint64_t *)&sess_mgr_cfg->udp_timeout_ms.data, 1, 15999999000); + ret += load_toml_integer_config(toml_file, "session_manager.udp_timeout_ms.discard_default", (uint64_t *)&sess_mgr_cfg->udp_timeout_ms.discard_default, 1, 15999999000); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.duplicated_packet_bloom_filter.enable", (uint64_t *)&sess_mgr_cfg->duplicated_packet_bloom_filter.enable, 0, 1); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.duplicated_packet_bloom_filter.capacity", (uint64_t *)&sess_mgr_cfg->duplicated_packet_bloom_filter.capacity, 1, 4294967295); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.duplicated_packet_bloom_filter.time_window_ms", (uint64_t *)&sess_mgr_cfg->duplicated_packet_bloom_filter.time_window_ms, 1, 60000); - ret += load_and_validate_toml_double_config(toml_file, "session_manager.duplicated_packet_bloom_filter.error_rate", (double *)&sess_mgr_cfg->duplicated_packet_bloom_filter.error_rate, 0.0, 1.0); + ret += load_toml_integer_config(toml_file, "session_manager.duplicated_packet_bloom_filter.enable", (uint64_t *)&sess_mgr_cfg->duplicated_packet_bloom_filter.enable, 0, 1); + ret += load_toml_integer_config(toml_file, "session_manager.duplicated_packet_bloom_filter.capacity", (uint64_t *)&sess_mgr_cfg->duplicated_packet_bloom_filter.capacity, 1, 4294967295); + ret += load_toml_integer_config(toml_file, "session_manager.duplicated_packet_bloom_filter.time_window_ms", (uint64_t *)&sess_mgr_cfg->duplicated_packet_bloom_filter.time_window_ms, 1, 60000); + ret += load_toml_double_config(toml_file, "session_manager.duplicated_packet_bloom_filter.error_rate", (double *)&sess_mgr_cfg->duplicated_packet_bloom_filter.error_rate, 0.0, 1.0); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evicted_session_bloom_filter.enable", (uint64_t *)&sess_mgr_cfg->evicted_session_bloom_filter.enable, 0, 1); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evicted_session_bloom_filter.capacity", (uint64_t *)&sess_mgr_cfg->evicted_session_bloom_filter.capacity, 1, 4294967295); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evicted_session_bloom_filter.time_window_ms", (uint64_t *)&sess_mgr_cfg->evicted_session_bloom_filter.time_window_ms, 1, 60000); - ret += load_and_validate_toml_double_config(toml_file, "session_manager.evicted_session_bloom_filter.error_rate", (double *)&sess_mgr_cfg->evicted_session_bloom_filter.error_rate, 0.0, 1.0); + ret += load_toml_integer_config(toml_file, "session_manager.evicted_session_bloom_filter.enable", (uint64_t *)&sess_mgr_cfg->evicted_session_bloom_filter.enable, 0, 1); + ret += load_toml_integer_config(toml_file, "session_manager.evicted_session_bloom_filter.capacity", (uint64_t *)&sess_mgr_cfg->evicted_session_bloom_filter.capacity, 1, 4294967295); + ret += load_toml_integer_config(toml_file, "session_manager.evicted_session_bloom_filter.time_window_ms", (uint64_t *)&sess_mgr_cfg->evicted_session_bloom_filter.time_window_ms, 1, 60000); + ret += load_toml_double_config(toml_file, "session_manager.evicted_session_bloom_filter.error_rate", (double *)&sess_mgr_cfg->evicted_session_bloom_filter.error_rate, 0.0, 1.0); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_reassembly.enable", (uint64_t *)&sess_mgr_cfg->tcp_reassembly.enable, 0, 1); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_reassembly.timeout_ms", (uint64_t *)&sess_mgr_cfg->tcp_reassembly.timeout_ms, 1, 60000); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_reassembly.buffered_segments_max", (uint64_t *)&sess_mgr_cfg->tcp_reassembly.buffered_segments_max, 1, 512); + ret += load_toml_integer_config(toml_file, "session_manager.tcp_reassembly.enable", (uint64_t *)&sess_mgr_cfg->tcp_reassembly.enable, 0, 1); + ret += load_toml_integer_config(toml_file, "session_manager.tcp_reassembly.timeout_ms", (uint64_t *)&sess_mgr_cfg->tcp_reassembly.timeout_ms, 1, 60000); + ret += load_toml_integer_config(toml_file, "session_manager.tcp_reassembly.buffered_segments_max", (uint64_t *)&sess_mgr_cfg->tcp_reassembly.buffered_segments_max, 1, 512); if (ret != 0) { @@ -1066,9 +1066,6 @@ void session_manager_runtime_free_session(struct session_manager_runtime *sess_m { if (sess) { - struct exdata_runtime *exdata_rt = (struct exdata_runtime *)session_get_user_data(sess); - exdata_runtime_free(exdata_rt); - SESSION_MANAGER_RUNTIME_LOG_DEBUG("session %lu closed (%s)", session_get_id(sess), closing_reason_to_str(session_get_closing_reason(sess))); SESSION_MANAGER_LOG_DEBUG("session %lu closed (%s)", session_get_id(sess), closing_reason_to_str(session_get_closing_reason(sess))); @@ -1353,7 +1350,7 @@ uint64_t session_manager_runtime_scan(const struct session_manager_runtime *sess { return mached_sess_num; } - capacity = session_pool_capacity_size(sess_mgr_rt->sess_pool); + capacity = sess_mgr_rt->cfg.tcp_session_max + sess_mgr_rt->cfg.udp_session_max; if (opts->cursor >= capacity) { return mached_sess_num; diff --git a/infra/session_manager/session_pool.c b/infra/session_manager/session_pool.c index 17be97a..43bef64 100644 --- a/infra/session_manager/session_pool.c +++ b/infra/session_manager/session_pool.c @@ -1,3 +1,4 @@ +#include <assert.h> #include <stdlib.h> #include "session_internal.h" @@ -6,8 +7,9 @@ struct session_pool { uint64_t capacity; - uint64_t available; - struct session_list free_list; + uint64_t used; + uint64_t free; + struct session_queue free_list; }; struct session_pool *session_pool_new(uint64_t capacity) @@ -17,7 +19,8 @@ struct session_pool *session_pool_new(uint64_t capacity) { return NULL; } - pool->available = 0; + pool->used = 0; + pool->free = 0; pool->capacity = capacity; TAILQ_INIT(&pool->free_list); @@ -26,7 +29,7 @@ struct session_pool *session_pool_new(uint64_t capacity) { struct session *sess = &array[i]; TAILQ_INSERT_TAIL(&pool->free_list, sess, free_tqe); - pool->available++; + pool->free++; } return pool; @@ -40,8 +43,10 @@ void session_pool_free(struct session_pool *pool) while ((sess = TAILQ_FIRST(&pool->free_list))) { TAILQ_REMOVE(&pool->free_list, sess, free_tqe); - pool->available--; + pool->free--; } + assert(pool->free == 0); + assert(pool->used == 0); free(pool); pool = NULL; @@ -59,7 +64,8 @@ struct session *session_pool_pop(struct session_pool *pool) if (sess) { TAILQ_REMOVE(&pool->free_list, sess, free_tqe); - pool->available--; + pool->free--; + pool->used++; } return sess; @@ -73,7 +79,8 @@ void session_pool_push(struct session_pool *pool, struct session *sess) } TAILQ_INSERT_TAIL(&pool->free_list, sess, free_tqe); - pool->available++; + pool->free++; + pool->used--; } const struct session *session_pool_get0(const struct session_pool *pool, uint64_t idx) @@ -87,22 +94,12 @@ const struct session *session_pool_get0(const struct session_pool *pool, uint64_ return &array[idx]; } -uint64_t session_pool_available_num(const struct session_pool *pool) +uint64_t session_pool_get_free_num(const struct session_pool *pool) { - if (pool == NULL) - { - return 0; - } - - return pool->available; + return pool->free; } -uint64_t session_pool_capacity_size(const struct session_pool *pool) +uint64_t session_pool_get_used_num(const struct session_pool *pool) { - if (pool == NULL) - { - return 0; - } - - return pool->capacity; + return pool->used; } diff --git a/infra/session_manager/session_pool.h b/infra/session_manager/session_pool.h index 9f607b9..e3509c8 100644 --- a/infra/session_manager/session_pool.h +++ b/infra/session_manager/session_pool.h @@ -15,8 +15,8 @@ struct session *session_pool_pop(struct session_pool *pool); void session_pool_push(struct session_pool *pool, struct session *sess); const struct session *session_pool_get0(const struct session_pool *pool, uint64_t idx); -uint64_t session_pool_available_num(const struct session_pool *pool); -uint64_t session_pool_capacity_size(const struct session_pool *pool); +uint64_t session_pool_get_free_num(const struct session_pool *pool); +uint64_t session_pool_get_used_num(const struct session_pool *pool); #ifdef __cplusplus } diff --git a/infra/session_manager/session_table.c b/infra/session_manager/session_table.c index 9f53bc6..93d7e58 100644 --- a/infra/session_manager/session_table.c +++ b/infra/session_manager/session_table.c @@ -15,7 +15,7 @@ struct session_table void *arg; uint64_t count; - struct session_list lru_list; + struct session_queue lru_list; }; /****************************************************************************** diff --git a/infra/session_manager/session_utils.c b/infra/session_manager/session_utils.c index 83d6888..630cf59 100644 --- a/infra/session_manager/session_utils.c +++ b/infra/session_manager/session_utils.c @@ -214,7 +214,7 @@ struct tcp_segment *session_get_tcp_segment(struct session *sess) } else { - struct tcp_segment *seg = tcp_reassembly_pop(half->assembler); + struct tcp_segment *seg = tcp_reassembly_pop(half->tcp_reass); if (seg) { session_inc_stat(sess, type, STAT_TCP_SEGMENTS_REORDERED, 1); diff --git a/infra/session_manager/test/gtest_overload_evict_tcp_sess.cpp b/infra/session_manager/test/gtest_overload_evict_tcp_sess.cpp index 6518104..d7ec896 100644 --- a/infra/session_manager/test/gtest_overload_evict_tcp_sess.cpp +++ b/infra/session_manager/test/gtest_overload_evict_tcp_sess.cpp @@ -1,6 +1,6 @@ #include <gtest/gtest.h> -#include "utils.h" +#include "utils_internal.h" #include "packet_internal.h" #include "packet_parser.h" #include "session_internal.h" diff --git a/infra/session_manager/test/gtest_overload_evict_udp_sess.cpp b/infra/session_manager/test/gtest_overload_evict_udp_sess.cpp index aefe600..cb7b8b6 100644 --- a/infra/session_manager/test/gtest_overload_evict_udp_sess.cpp +++ b/infra/session_manager/test/gtest_overload_evict_udp_sess.cpp @@ -1,6 +1,6 @@ #include <gtest/gtest.h> -#include "utils.h" +#include "utils_internal.h" #include "packet_internal.h" #include "packet_parser.h" #include "session_internal.h" diff --git a/infra/session_manager/test/gtest_session_pool.cpp b/infra/session_manager/test/gtest_session_pool.cpp index 658369f..db6df28 100644 --- a/infra/session_manager/test/gtest_session_pool.cpp +++ b/infra/session_manager/test/gtest_session_pool.cpp @@ -10,49 +10,47 @@ TEST(SESSION_POOL, POP_PUSH) struct session *sess4 = NULL; struct session_pool *sess_pool = NULL; + // new sess_pool = session_pool_new(3); EXPECT_TRUE(sess_pool != NULL); - EXPECT_TRUE(session_pool_available_num(sess_pool) == 3); - EXPECT_TRUE(session_pool_capacity_size(sess_pool) == 3); + EXPECT_TRUE(session_pool_get_free_num(sess_pool) == 3); + EXPECT_TRUE(session_pool_get_used_num(sess_pool) == 0); + // pop sess1 = session_pool_pop(sess_pool); EXPECT_TRUE(sess1 != NULL); - EXPECT_TRUE(session_pool_available_num(sess_pool) == 2); - sess2 = session_pool_pop(sess_pool); - EXPECT_TRUE(sess2 != NULL); - EXPECT_TRUE(session_pool_available_num(sess_pool) == 1); - sess3 = session_pool_pop(sess_pool); - EXPECT_TRUE(sess3 != NULL); - EXPECT_TRUE(session_pool_available_num(sess_pool) == 0); - sess4 = session_pool_pop(sess_pool); - EXPECT_TRUE(sess4 == NULL); - - session_pool_push(sess_pool, sess1); - EXPECT_TRUE(session_pool_available_num(sess_pool) == 1); - session_pool_push(sess_pool, sess2); - EXPECT_TRUE(session_pool_available_num(sess_pool) == 2); - session_pool_push(sess_pool, sess3); - EXPECT_TRUE(session_pool_available_num(sess_pool) == 3); + EXPECT_TRUE(session_pool_get_free_num(sess_pool) == 2); + EXPECT_TRUE(session_pool_get_used_num(sess_pool) == 1); - sess1 = session_pool_pop(sess_pool); - EXPECT_TRUE(sess1 != NULL); - EXPECT_TRUE(session_pool_available_num(sess_pool) == 2); sess2 = session_pool_pop(sess_pool); EXPECT_TRUE(sess2 != NULL); - EXPECT_TRUE(session_pool_available_num(sess_pool) == 1); + EXPECT_TRUE(session_pool_get_free_num(sess_pool) == 1); + EXPECT_TRUE(session_pool_get_used_num(sess_pool) == 2); + sess3 = session_pool_pop(sess_pool); EXPECT_TRUE(sess3 != NULL); - EXPECT_TRUE(session_pool_available_num(sess_pool) == 0); + EXPECT_TRUE(session_pool_get_free_num(sess_pool) == 0); + EXPECT_TRUE(session_pool_get_used_num(sess_pool) == 3); + sess4 = session_pool_pop(sess_pool); EXPECT_TRUE(sess4 == NULL); + EXPECT_TRUE(session_pool_get_free_num(sess_pool) == 0); + EXPECT_TRUE(session_pool_get_used_num(sess_pool) == 3); + // push session_pool_push(sess_pool, sess1); - EXPECT_TRUE(session_pool_available_num(sess_pool) == 1); + EXPECT_TRUE(session_pool_get_free_num(sess_pool) == 1); + EXPECT_TRUE(session_pool_get_used_num(sess_pool) == 2); + session_pool_push(sess_pool, sess2); - EXPECT_TRUE(session_pool_available_num(sess_pool) == 2); + EXPECT_TRUE(session_pool_get_free_num(sess_pool) == 2); + EXPECT_TRUE(session_pool_get_used_num(sess_pool) == 1); + session_pool_push(sess_pool, sess3); - EXPECT_TRUE(session_pool_available_num(sess_pool) == 3); + EXPECT_TRUE(session_pool_get_free_num(sess_pool) == 3); + EXPECT_TRUE(session_pool_get_used_num(sess_pool) == 0); + // free session_pool_free(sess_pool); } |
