diff options
| author | luwenpeng <[email protected]> | 2024-09-20 16:56:05 +0800 |
|---|---|---|
| committer | luwenpeng <[email protected]> | 2024-09-20 17:06:10 +0800 |
| commit | 94f1913e3e4cabee1d93d6de446f489dcf45ca62 (patch) | |
| tree | cfbe90db308e1c2c080ca4b46a80d7ccb255258c | |
| parent | 620019cf8e1c2be20a478ff8572bb0a9c22ddf3c (diff) | |
refactor(session manager): turning the session manager into a stellar module
| -rw-r--r-- | include/stellar/packet.h | 3 | ||||
| -rw-r--r-- | include/stellar/packet_manager.h | 7 | ||||
| -rw-r--r-- | include/stellar/session.h | 15 | ||||
| -rw-r--r-- | include/stellar/session_manager.h | 27 | ||||
| -rw-r--r-- | infra/packet_manager/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | infra/packet_manager/packet_manager.c | 44 | ||||
| -rw-r--r-- | infra/packet_manager/packet_manager_internal.h | 1 | ||||
| -rw-r--r-- | infra/packet_manager/packet_manager_private.h | 46 | ||||
| -rw-r--r-- | infra/packet_manager/packet_utils.c | 13 | ||||
| -rw-r--r-- | infra/session_manager/CMakeLists.txt | 5 | ||||
| -rw-r--r-- | infra/session_manager/session_manager.c | 1754 | ||||
| -rw-r--r-- | infra/session_manager/session_manager_runtime.c | 1462 | ||||
| -rw-r--r-- | infra/session_manager/session_manager_runtime.h (renamed from infra/session_manager/session_manager.h) | 0 | ||||
| -rw-r--r-- | infra/session_manager/session_utils.c | 17 | ||||
| -rw-r--r-- | infra/session_manager/test/default_config.h | 2 | ||||
| -rw-r--r-- | infra/stellar_core.c | 2 | ||||
| -rw-r--r-- | infra/stellar_stat.h | 2 | ||||
| -rw-r--r-- | infra/tcp_reassembly/tcp_reassembly.c | 24 | ||||
| -rw-r--r-- | infra/tcp_reassembly/tcp_reassembly.h | 1 | ||||
| -rw-r--r-- | infra/version.map | 6 |
20 files changed, 1935 insertions, 1498 deletions
diff --git a/include/stellar/packet.h b/include/stellar/packet.h index 5ea2e75..e910237 100644 --- a/include/stellar/packet.h +++ b/include/stellar/packet.h @@ -185,6 +185,9 @@ uint16_t packet_get_raw_len(const struct packet *pkt); const char *packet_get_payload(const struct packet *pkt); uint16_t packet_get_payload_len(const struct packet *pkt); +void packet_set_exdata(struct packet *pkt, int idx, void *ex_ptr); +void *packet_get_exdata(struct packet *pkt, int idx); + #ifdef __cplusplus } #endif diff --git a/include/stellar/packet_manager.h b/include/stellar/packet_manager.h index ebbff80..8270c7b 100644 --- a/include/stellar/packet_manager.h +++ b/include/stellar/packet_manager.h @@ -5,7 +5,8 @@ extern "C" { #endif -#include "packet.h" +#include "stellar/exdata.h" +#include "stellar/packet.h" enum packet_stage { @@ -19,8 +20,10 @@ enum packet_stage struct packet_manager; +int packet_manager_new_packet_exdata_index(struct packet_manager *pkt_mgr, const char *name, exdata_free *func, void *arg); + typedef void on_packet_stage_callback(enum packet_stage stage, struct packet *pkt, void *args); -int packet_manager_subscribe(struct packet_manager *pkt_mgr, enum packet_stage stage, on_packet_stage_callback cb, void *args); +int packet_manager_subscribe(struct packet_manager *pkt_mgr, enum packet_stage stage, on_packet_stage_callback *cb, void *args); // if two modules claim the same packet at the same stage, the second 'claim' fails. // return 0 on success diff --git a/include/stellar/session.h b/include/stellar/session.h index a1aa684..67c8c98 100644 --- a/include/stellar/session.h +++ b/include/stellar/session.h @@ -5,20 +5,8 @@ extern "C" { #endif -#include <stdint.h> - #include "stellar/packet.h" -struct tcp_segment; -const char *tcp_segment_get_data(const struct tcp_segment *seg); -uint16_t tcp_segment_get_len(const struct tcp_segment *seg); - -#define TOPIC_TCP_STREAM "TCP_STREAM" //topic message: tcp_segment -#define TOPIC_CONTROL_PACKET "CONTROL_PACKET" //topic message: packet - -#define TOPIC_TCP "TCP" //topic message: session -#define TOPIC_UDP "UDP" //topic message: session - enum session_state { SESSION_STATE_INIT = 0, @@ -155,6 +143,9 @@ const char *session_get0_readable_addr(const struct session *sess); void session_set_discard(struct session *sess); +void session_set_exdata(struct session *sess, int idx, void *ex_ptr); +void *session_get_exdata(const struct session *sess, int idx); + #ifdef __cplusplus } #endif diff --git a/include/stellar/session_manager.h b/include/stellar/session_manager.h new file mode 100644 index 0000000..81fc316 --- /dev/null +++ b/include/stellar/session_manager.h @@ -0,0 +1,27 @@ + + +#pragma once + +#ifdef __cplusplus +extern "C" +{ +#endif + +#include "stellar/exdata.h" +#include "stellar/session.h" + +struct session_manager; + +int session_manager_new_packet_exdata_index(struct session_manager *sess_mgr, const char *name, exdata_free *func, void *arg); + +typedef void on_session_callback(struct session *sess, struct packet *pkt, void *args); +typedef void on_tcp_stream_callback(struct session *sess, const char *tcp_payload, uint32_t tcp_payload_len, void *args); + +int session_manager_subscribe_tcp(struct session_manager *sess_mgr, on_session_callback *cb, void *args); +int session_manager_subscribe_udp(struct session_manager *sess_mgr, on_session_callback *cb, void *args); +int session_manager_subscribe_control_packet(struct session_manager *sess_mgr, on_session_callback *cb, void *args); +int session_manager_subscribe_tcp_stream(struct session_manager *sess_mgr, on_tcp_stream_callback *cb, void *args); + +#ifdef __cplusplus +} +#endif diff --git a/infra/packet_manager/CMakeLists.txt b/infra/packet_manager/CMakeLists.txt index ada4303..24cae44 100644 --- a/infra/packet_manager/CMakeLists.txt +++ b/infra/packet_manager/CMakeLists.txt @@ -11,6 +11,6 @@ target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/uthash target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/logger) target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/include) target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/infra) -target_link_libraries(packet_manager tuple logger dablooms mq) +target_link_libraries(packet_manager tuple logger dablooms mq exdata) add_subdirectory(test)
\ No newline at end of file diff --git a/infra/packet_manager/packet_manager.c b/infra/packet_manager/packet_manager.c index 44efca8..2c422d3 100644 --- a/infra/packet_manager/packet_manager.c +++ b/infra/packet_manager/packet_manager.c @@ -18,6 +18,7 @@ struct packet_manager_config struct packet_manager_schema { + struct exdata_schema *exdata; struct mq_schema *mq; int topic_id[PACKET_STAGE_MAX]; }; @@ -50,17 +51,17 @@ const char *packet_stage_to_str(enum packet_stage stage) switch (stage) { case PACKET_STAGE_PREROUTING: - return "prerouting"; + return "PACKET_STAGE_PREROUTING"; case PACKET_STAGE_INPUT: - return "input"; + return "PACKET_STAGE_INPUT"; case PACKET_STAGE_FORWARD: - return "forward"; + return "PACKET_STAGE_FORWARD"; case PACKET_STAGE_OUTPUT: - return "output"; + return "PACKET_STAGE_OUTPUT"; case PACKET_STAGE_POSTROUTING: - return "postrouting"; + return "PACKET_STAGE_POSTROUTING"; default: - return "unknown"; + return "PACKET_STAGE_UNKNOWN"; } } @@ -102,7 +103,7 @@ static struct packet_manager_config *packet_manager_config_new(const char *toml_ * packet manager schema ******************************************************************************/ -static void on_packet_stage_dispatch(int topic_id, const void *msg, on_msg_cb_func *cb, void *cb_arg, void *dispatch_arg) +static void on_packet_stage_dispatch(int topic_id, void *msg, on_msg_cb_func *cb, void *cb_arg, void *dispatch_arg) { assert(msg); assert(dispatch_arg); @@ -120,7 +121,7 @@ static void on_packet_stage_dispatch(int topic_id, const void *msg, on_msg_cb_fu } } - ((on_packet_stage_callback *)cb)(stage, pkt, cb_arg); + ((on_packet_stage_callback *)(void *)cb)(stage, pkt, cb_arg); } static void packet_manager_schema_free(struct packet_manager_schema *pkt_mgr_schema) @@ -138,6 +139,11 @@ static void packet_manager_schema_free(struct packet_manager_schema *pkt_mgr_sch } } + if (pkt_mgr_schema->exdata) + { + exdata_schema_free(pkt_mgr_schema->exdata); + } + free(pkt_mgr_schema); pkt_mgr_schema = NULL; } @@ -152,11 +158,17 @@ static struct packet_manager_schema *packet_manager_schema_new(struct mq_schema return NULL; } - pkt_mgr_schema->mq = mq; + pkt_mgr_schema->exdata = exdata_schema_new(); + if (pkt_mgr_schema->exdata == NULL) + { + PACKET_MANAGER_LOG_ERROR("failed to create exdata_schema"); + goto error_out; + } + pkt_mgr_schema->mq = mq; for (int i = 0; i < PACKET_STAGE_MAX; i++) { - pkt_mgr_schema->topic_id[i] = mq_schema_create_topic(pkt_mgr_schema->mq, packet_stage_to_str(i), (on_msg_dispatch_cb_func *)on_packet_stage_dispatch, pkt_mgr_schema, NULL, NULL); + pkt_mgr_schema->topic_id[i] = mq_schema_create_topic(pkt_mgr_schema->mq, packet_stage_to_str(i), &on_packet_stage_dispatch, pkt_mgr_schema, NULL, NULL); if (pkt_mgr_schema->topic_id[i] < 0) { PACKET_MANAGER_LOG_ERROR("failed to create topic %s", packet_stage_to_str(i)); @@ -276,7 +288,12 @@ void packet_manager_free(struct packet_manager *pkt_mgr) } } -int packet_manager_subscribe(struct packet_manager *pkt_mgr, enum packet_stage stage, on_packet_stage_callback cb, void *args) +int packet_manager_new_packet_exdata_index(struct packet_manager *pkt_mgr, const char *name, exdata_free *func, void *arg) +{ + return exdata_schema_new_index(pkt_mgr->schema->exdata, name, func, arg); +} + +int packet_manager_subscribe(struct packet_manager *pkt_mgr, enum packet_stage stage, on_packet_stage_callback *cb, void *args) { return mq_schema_subscribe(pkt_mgr->schema->mq, pkt_mgr->schema->topic_id[stage], (on_msg_cb_func *)cb, args); } @@ -291,6 +308,8 @@ void packet_manager_init(struct packet_manager *pkt_mgr, uint16_t thread_id, str void packet_manager_ingress(struct packet_manager *pkt_mgr, uint16_t thread_id, struct packet *pkt) { struct packet_manager_runtime *runtime = pkt_mgr->runtime[thread_id]; + struct exdata_runtime *exdata_rt = exdata_runtime_new(pkt_mgr->schema->exdata); + packet_set_user_data(pkt, exdata_rt); runtime->stat.total.pkts_ingress++; runtime->stat.queue[PACKET_STAGE_PREROUTING].pkts_in++; @@ -307,6 +326,9 @@ struct packet *packet_manager_egress(struct packet_manager *pkt_mgr, uint16_t th runtime->stat.total.pkts_egress++; runtime->stat.queue[PACKET_STAGE_MAX].pkts_out++; TAILQ_REMOVE(&runtime->queue[PACKET_STAGE_MAX], pkt, stage_tqe); + + struct exdata_runtime *exdata_rt = packet_get_user_data(pkt); + exdata_runtime_free(exdata_rt); } return pkt; } diff --git a/infra/packet_manager/packet_manager_internal.h b/infra/packet_manager/packet_manager_internal.h index f8ac98e..13bdc3b 100644 --- a/infra/packet_manager/packet_manager_internal.h +++ b/infra/packet_manager/packet_manager_internal.h @@ -5,6 +5,7 @@ extern "C" { #endif +#include "stellar/mq.h" #include "stellar/packet_manager.h" #define PACKET_QUEUE_MAX (PACKET_STAGE_MAX + 1) diff --git a/infra/packet_manager/packet_manager_private.h b/infra/packet_manager/packet_manager_private.h deleted file mode 100644 index 4fe6a36..0000000 --- a/infra/packet_manager/packet_manager_private.h +++ /dev/null @@ -1,46 +0,0 @@ -#pragma once - -#ifdef __cplusplus -extern "C" -{ -#endif - -#include "stellar/packet_manager.h" - -#define PACKET_QUEUE_MAX (PACKET_STAGE_MAX + 1) - -struct packet_manager *packet_manager_new(struct mq_schema *mq_schema, const char *toml_file); -void packet_manager_free(struct packet_manager *pkt_mgr); - -void packet_manager_runtime_init(struct packet_manager_runtime *pkt_mgr_rt, struct mq_runtime *mq_rt); -void packet_manager_runtime_ingress(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt); -struct packet *packet_manager_runtime_egress(struct packet_manager_runtime *pkt_mgr_rt); -void packet_manager_runtime_dispatch(struct packet_manager_runtime *pkt_mgr_rt); - -/****************************************************************************** - * for gtest - ******************************************************************************/ - -struct packet_manager_stat -{ - struct - { - uint64_t pkts_ingress; - uint64_t pkts_egress; - } total; - struct - { - uint64_t pkts_in; // include the packets that are scheduled - uint64_t pkts_out; // include the packets that are claimed - uint64_t pkts_claim; - uint64_t pkts_schedule; - } queue[PACKET_QUEUE_MAX]; // the last queue is for sending packets -} __attribute__((aligned(64))); - -const char *packet_stage_to_str(enum packet_stage stage); -void packet_manager_runtime_print_stat(struct packet_manager_runtime *runtime); -struct packet_manager_stat *packet_manager_runtime_get_stat(struct packet_manager_runtime *runtime); - -#ifdef __cplusplus -} -#endif diff --git a/infra/packet_manager/packet_utils.c b/infra/packet_manager/packet_utils.c index a9d4f80..331afee 100644 --- a/infra/packet_manager/packet_utils.c +++ b/infra/packet_manager/packet_utils.c @@ -5,6 +5,7 @@ #include "log_internal.h" #include "packet_helper.h" #include "packet_internal.h" +#include "stellar/exdata.h" #define PACKET_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "packet", format, ##__VA_ARGS__) @@ -951,3 +952,15 @@ int packet_is_fragment(const struct packet *pkt) { return (pkt->frag_layer) ? 1 : 0; } + +void packet_set_exdata(struct packet *pkt, int idx, void *ex_ptr) +{ + struct exdata_runtime *exdata_rt = (struct exdata_runtime *)packet_get_user_data(pkt); + exdata_set(exdata_rt, idx, ex_ptr); +} + +void *packet_get_exdata(struct packet *pkt, int idx) +{ + struct exdata_runtime *exdata_rt = (struct exdata_runtime *)packet_get_user_data(pkt); + return exdata_get(exdata_rt, idx); +} diff --git a/infra/session_manager/CMakeLists.txt b/infra/session_manager/CMakeLists.txt index 5404fde..a045740 100644 --- a/infra/session_manager/CMakeLists.txt +++ b/infra/session_manager/CMakeLists.txt @@ -4,12 +4,13 @@ add_library(session_manager session_table.c session_timer.c session_filter.c - session_manager.c session_transition.c + session_manager_runtime.c + session_manager.c ) target_include_directories(session_manager PUBLIC ${CMAKE_CURRENT_LIST_DIR}) target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/infra/) target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/include) -target_link_libraries(session_manager timeout packet_manager tcp_reassembly) +target_link_libraries(session_manager timeout packet_manager tcp_reassembly mq exdata) add_subdirectory(test)
\ No newline at end of file diff --git a/infra/session_manager/session_manager.c b/infra/session_manager/session_manager.c index c56258d..e8b2a08 100644 --- a/infra/session_manager/session_manager.c +++ b/infra/session_manager/session_manager.c @@ -1,1462 +1,430 @@ -#include <time.h> -#include <stdlib.h> #include <assert.h> -#include <errno.h> +#include <stdlib.h> + +#include "stellar/exdata.h" +#include "stellar/packet_manager.h" +#include "stellar/session_manager.h" +#include "stellar/module_manager.h" #include "utils.h" -#include "packet_helper.h" -#include "packet_filter.h" #include "session_internal.h" -#include "session_pool.h" -#include "session_table.h" -#include "session_timer.h" -#include "session_filter.h" -#include "session_manager.h" -#include "session_transition.h" +#include "session_manager_runtime.h" -#define SESSION_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "session", format, ##__VA_ARGS__) -#define SESSION_LOG_DEBUG(format, ...) STELLAR_LOG_DEBUG(__thread_local_logger, "session", format, ##__VA_ARGS__) -#define SESSION_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "session", format, ##__VA_ARGS__) +#pragma GCC diagnostic ignored "-Wunused-parameter" +#pragma GCC diagnostic ignored "-Wunused-function" -struct snowflake -{ - uint64_t seed; - uint64_t sequence; -}; +#define SESSION_MANAGER_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "session manager", format, ##__VA_ARGS__) +#define SESSION_MANAGER_LOG_DEBUG(format, ...) STELLAR_LOG_DEBUG(__thread_local_logger, "session manager", format, ##__VA_ARGS__) +#define SESSION_MANAGER_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "session manager", format, ##__VA_ARGS__) -struct session_manager_runtime +struct session_manager_schema { - struct session_list evicte_list; - struct session_pool *sess_pool; - struct session_timer *sess_timer; - struct session_table *tcp_sess_table; - struct session_table *udp_sess_table; - - struct packet_filter *dup_pkt_filter; - struct session_filter *evicte_sess_filter; + struct exdata_schema *exdata; + struct mq_schema *mq; - struct session_manager_stat stat; - struct session_manager_config cfg; + int pkt_exdata_idx; - /* - * only used for session_set_discard() or session_manager_runtime_record_duplicated_packet(), - * because the function is called by plugin and has no time input. - */ - uint64_t now_ms; - uint64_t last_clean_expired_sess_ts; - struct snowflake *sf; + int topic_id_tcp; + int topic_id_udp; + int topic_id_ctrl_pkt; + int topic_id_tcp_stream; }; -#define EVICTE_SESSION_BURST (RX_BURST_MAX) - -/****************************************************************************** - * session manager stat macro - ******************************************************************************/ - -#define SESS_MGR_STAT_INC(stat, state, proto) \ - { \ - switch ((state)) \ - { \ - case SESSION_STATE_OPENING: \ - (stat)->proto##_sess_opening++; \ - break; \ - case SESSION_STATE_ACTIVE: \ - (stat)->proto##_sess_active++; \ - break; \ - case SESSION_STATE_CLOSING: \ - (stat)->proto##_sess_closing++; \ - break; \ - case SESSION_STATE_DISCARD: \ - (stat)->proto##_sess_discard++; \ - break; \ - case SESSION_STATE_CLOSED: \ - (stat)->proto##_sess_closed++; \ - break; \ - default: \ - break; \ - } \ - } - -#define SESS_MGR_STAT_DEC(stat, state, proto) \ - { \ - switch ((state)) \ - { \ - case SESSION_STATE_OPENING: \ - (stat)->proto##_sess_opening--; \ - break; \ - case SESSION_STATE_ACTIVE: \ - (stat)->proto##_sess_active--; \ - break; \ - case SESSION_STATE_CLOSING: \ - (stat)->proto##_sess_closing--; \ - break; \ - case SESSION_STATE_DISCARD: \ - (stat)->proto##_sess_discard--; \ - break; \ - case SESSION_STATE_CLOSED: \ - (stat)->proto##_sess_closed--; \ - break; \ - default: \ - break; \ - } \ - } - -#define SESS_MGR_STAT_UPDATE(stat, curr, next, proto) \ - { \ - if (curr != next) \ - { \ - SESS_MGR_STAT_DEC(stat, curr, proto); \ - SESS_MGR_STAT_INC(stat, next, proto); \ - } \ - } - -/****************************************************************************** - * snowflake - ******************************************************************************/ - -static struct snowflake *snowflake_new(uint64_t seed) -{ - struct snowflake *sf = (struct snowflake *)calloc(1, sizeof(struct snowflake)); - if (sf == NULL) - { - return NULL; - } - - sf->seed = seed & 0xFFFFF; - sf->sequence = 0; - - return sf; -} - -static void snowflake_free(struct snowflake *sf) -{ - if (sf != NULL) - { - free(sf); - sf = NULL; - } -} - -/* - * high -> low - * - * +------+------------------+----------------+------------------------+---------------------------+ - * | 1bit | 12bit device_id | 8bit thread_id | 28bit timestamp in sec | 15bit sequence per thread | - * +------+------------------+----------------+------------------------+---------------------------+ - */ - -#define MAX_ID_PER_THREAD (32768) -#define MAX_ID_BASE_TIME (268435456L) - -static uint64_t snowflake_generate(struct snowflake *sf, uint64_t now_sec) +struct session_manager { - uint64_t id = 0; - uint64_t id_per_thread = (sf->sequence++) % MAX_ID_PER_THREAD; - uint64_t id_base_time = now_sec % MAX_ID_BASE_TIME; - - id = (sf->seed << 43) | (id_base_time << 15) | (id_per_thread); - - return id; -} + uint16_t thread_num; + struct session_manager_config *cfg; + struct session_manager_schema *schema; + struct session_manager_runtime *runtime[MAX_THREAD_NUM]; + struct stellar_module_manager *mod_mgr; +}; /****************************************************************************** - * TCP utils + * callback ******************************************************************************/ -static void tcp_clean(struct session_manager_runtime *sess_mgr_rt, struct session *sess) -{ - struct tcp_reassembly *c2s_ssembler = sess->tcp_halfs[FLOW_TYPE_C2S].assembler; - struct tcp_reassembly *s2c_ssembler = sess->tcp_halfs[FLOW_TYPE_S2C].assembler; - struct tcp_segment *seg; - if (c2s_ssembler) - { - while ((seg = tcp_reassembly_expire(c2s_ssembler, UINT64_MAX))) - { - session_inc_stat(sess, FLOW_TYPE_C2S, STAT_TCP_SEGMENTS_RELEASED, 1); - session_inc_stat(sess, FLOW_TYPE_C2S, STAT_TCP_PAYLOADS_RELEASED, seg->len); - sess_mgr_rt->stat.tcp_segs_freed++; - tcp_segment_free(seg); - } - tcp_reassembly_free(c2s_ssembler); - } - if (s2c_ssembler) - { - while ((seg = tcp_reassembly_expire(s2c_ssembler, UINT64_MAX))) - { - session_inc_stat(sess, FLOW_TYPE_S2C, STAT_TCP_SEGMENTS_RELEASED, 1); - session_inc_stat(sess, FLOW_TYPE_S2C, STAT_TCP_PAYLOADS_RELEASED, seg->len); - sess_mgr_rt->stat.tcp_segs_freed++; - tcp_segment_free(seg); - } - tcp_reassembly_free(s2c_ssembler); - } -} - -static int tcp_init(struct session_manager_runtime *sess_mgr_rt, struct session *sess) -{ - if (!sess_mgr_rt->cfg.tcp_reassembly.enable) - { - return 0; - } - - sess->tcp_halfs[FLOW_TYPE_C2S].assembler = tcp_reassembly_new(sess_mgr_rt->cfg.tcp_reassembly.timeout_ms, sess_mgr_rt->cfg.tcp_reassembly.buffered_segments_max); - sess->tcp_halfs[FLOW_TYPE_S2C].assembler = tcp_reassembly_new(sess_mgr_rt->cfg.tcp_reassembly.timeout_ms, sess_mgr_rt->cfg.tcp_reassembly.buffered_segments_max); - if (sess->tcp_halfs[FLOW_TYPE_C2S].assembler == NULL || sess->tcp_halfs[FLOW_TYPE_S2C].assembler == NULL) - { - tcp_clean(sess_mgr_rt, sess); - return -1; - } - - SESSION_LOG_DEBUG("session %lu %s new c2s tcp assembler %p, s2c tcp assembler %p", - session_get_id(sess), session_get0_readable_addr(sess), - sess->tcp_halfs[FLOW_TYPE_C2S].assembler, - sess->tcp_halfs[FLOW_TYPE_S2C].assembler); - - return 0; -} - -static void tcp_update(struct session_manager_runtime *sess_mgr_rt, struct session *sess, enum flow_type type, const struct layer_private *tcp_layer) -{ - struct tcp_segment *seg; - struct tcphdr *hdr = (struct tcphdr *)tcp_layer->hdr_ptr; - struct tcp_half *half = &sess->tcp_halfs[type]; - uint8_t flags = tcp_hdr_get_flags(hdr); - uint16_t len = tcp_layer->pld_len; - - if ((flags & TH_SYN) && half->isn == 0) - { - half->isn = tcp_hdr_get_seq(hdr); - } - half->flags = flags; - half->history |= flags; - half->seq = tcp_hdr_get_seq(hdr); - half->ack = tcp_hdr_get_ack(hdr); - half->len = tcp_layer->pld_len; - - if (!sess_mgr_rt->cfg.tcp_reassembly.enable) - { - if (len) - { - session_inc_stat(sess, type, STAT_TCP_SEGMENTS_RECEIVED, 1); - session_inc_stat(sess, type, STAT_TCP_PAYLOADS_RECEIVED, len); - sess_mgr_rt->stat.tcp_segs_input++; - - session_inc_stat(sess, type, STAT_TCP_SEGMENTS_INORDER, 1); - session_inc_stat(sess, type, STAT_TCP_PAYLOADS_INORDER, len); - sess_mgr_rt->stat.tcp_segs_inorder++; - - half->in_order.data = tcp_layer->pld_ptr; - half->in_order.len = len; - half->in_order_ref = 0; - } - return; - } - - if (unlikely(flags & TH_SYN)) - { - // len > 0 is SYN with data (TCP Fast Open) - tcp_reassembly_set_recv_next(half->assembler, len ? half->seq : half->seq + 1); - } - - seg = tcp_reassembly_expire(half->assembler, sess_mgr_rt->now_ms); - if (seg) - { - session_inc_stat(sess, type, STAT_TCP_SEGMENTS_EXPIRED, 1); - session_inc_stat(sess, type, STAT_TCP_PAYLOADS_EXPIRED, seg->len); - sess_mgr_rt->stat.tcp_segs_timeout++; - - session_inc_stat(sess, type, STAT_TCP_SEGMENTS_RELEASED, 1); - session_inc_stat(sess, type, STAT_TCP_PAYLOADS_RELEASED, seg->len); - sess_mgr_rt->stat.tcp_segs_freed++; - - tcp_segment_free(seg); - } - - if (len) - { - session_inc_stat(sess, type, STAT_TCP_SEGMENTS_RECEIVED, 1); - session_inc_stat(sess, type, STAT_TCP_PAYLOADS_RECEIVED, len); - sess_mgr_rt->stat.tcp_segs_input++; - - uint32_t rcv_nxt = tcp_reassembly_get_recv_next(half->assembler); - // in order - if (half->seq == rcv_nxt) - { - session_inc_stat(sess, type, STAT_TCP_SEGMENTS_INORDER, 1); - session_inc_stat(sess, type, STAT_TCP_PAYLOADS_INORDER, len); - sess_mgr_rt->stat.tcp_segs_inorder++; - - half->in_order.data = tcp_layer->pld_ptr; - half->in_order.len = len; - half->in_order_ref = 0; - tcp_reassembly_inc_recv_next(half->assembler, len); - } - // retransmission - else if (uint32_before(uint32_add(half->seq, len), rcv_nxt)) - { - session_inc_stat(sess, type, STAT_TCP_SEGMENTS_RETRANSMIT, 1); - session_inc_stat(sess, type, STAT_TCP_PAYLOADS_RETRANSMIT, len); - sess_mgr_rt->stat.tcp_segs_retransmited++; - } - else if ((seg = tcp_segment_new(half->seq, tcp_layer->pld_ptr, len))) - { - switch (tcp_reassembly_push(half->assembler, seg, sess_mgr_rt->now_ms)) - { - case -2: - session_inc_stat(sess, type, STAT_TCP_SEGMENTS_RETRANSMIT, 1); - session_inc_stat(sess, type, STAT_TCP_PAYLOADS_RETRANSMIT, len); - sess_mgr_rt->stat.tcp_segs_retransmited++; - tcp_segment_free(seg); - break; - case -1: - session_inc_stat(sess, type, STAT_TCP_SEGMENTS_NOSPACE, 1); - session_inc_stat(sess, type, STAT_TCP_PAYLOADS_NOSPACE, len); - sess_mgr_rt->stat.tcp_segs_omitted_too_many++; - tcp_segment_free(seg); - break; - case 0: - session_inc_stat(sess, type, STAT_TCP_SEGMENTS_BUFFERED, 1); - session_inc_stat(sess, type, STAT_TCP_PAYLOADS_BUFFERED, len); - sess_mgr_rt->stat.tcp_segs_buffered++; - break; - case 1: - session_inc_stat(sess, type, STAT_TCP_SEGMENTS_OVERLAP, 1); - session_inc_stat(sess, type, STAT_TCP_PAYLOADS_OVERLAP, len); - sess_mgr_rt->stat.tcp_segs_overlapped++; - - session_inc_stat(sess, type, STAT_TCP_SEGMENTS_BUFFERED, 1); - session_inc_stat(sess, type, STAT_TCP_PAYLOADS_BUFFERED, len); - sess_mgr_rt->stat.tcp_segs_buffered++; - break; - default: - assert(0); - break; - } - } - else - { - session_inc_stat(sess, type, STAT_TCP_SEGMENTS_NOSPACE, 1); - session_inc_stat(sess, type, STAT_TCP_PAYLOADS_NOSPACE, len); - sess_mgr_rt->stat.tcp_segs_omitted_too_many++; - } - } +static void on_session_dispatch(int topic_id, void *msg, on_msg_cb_func *cb, void *cb_args, void *dispatch_args) +{ + struct session *sess = (struct session *)msg; + struct packet *pkt = (struct packet *)session_get0_current_packet(sess); + + ((on_session_callback *)(void *)cb)(sess, pkt, cb_args); +} + +static void on_tcp_stream_dispatch(int topic_id, void *msg, on_msg_cb_func *cb, void *cb_args, void *dispatch_args) +{ + struct tcp_segment *seg = (struct tcp_segment *)msg; + + ((on_tcp_stream_callback *)(void *)cb)(seg->user_data, seg->data, seg->len, cb_args); +} + +static void on_tcp_stream_free(void *msg, void *args) +{ + struct tcp_segment *seg = (struct tcp_segment *)msg; + struct session *sess = (struct session *)seg->user_data; + + session_free_tcp_segment(sess, seg); +} + +static void on_packet_forward(enum packet_stage stage, struct packet *pkt, void *args) +{ + struct session_manager *sess_mgr = (struct session_manager *)args; + struct stellar_module_manager *mod_mgr = sess_mgr->mod_mgr; + int thread_id = stellar_module_manager_get_thread_id(mod_mgr); + struct mq_runtime *mq_rt = stellar_module_manager_get_mq_runtime(mod_mgr); + struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id]; + + /* + * We use the system's real time instead of monotonic time for the following reasons: + * -> Session creation/closure times require real time (e.g., for logging session activities). + * -> Session ID generation relies on real time (e.g., for reverse calculating session creation time from the session ID). + * + * Note: Modifying the system time will affect the timing wheel, impacting session expiration, and TCP reassembly expiration. + * Suggestion: After modifying the system time, restart the service to ensure consistent timing. + */ + uint64_t now_ms = clock_get_real_time_ms(); + + struct tcp_segment *seg = NULL; + struct session *sess = session_manager_runtime_lookup_session_by_packet(sess_mgr_rt, pkt); + if (sess == NULL) + { + sess = session_manager_runtime_new_session(sess_mgr_rt, pkt, now_ms); + if (sess == NULL) + { + goto fast_path; + } + else + { + session_set_user_data(sess, exdata_runtime_new(sess_mgr->schema->exdata)); + goto slow_path; + } + } + else + { + if (session_manager_runtime_update_session(sess_mgr_rt, sess, pkt, now_ms) == -1) + { + goto fast_path; + } + else + { + goto slow_path; + } + } + +slow_path: + if (session_get_type(sess) == SESSION_TYPE_TCP) + { + mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_tcp, sess); + while ((seg = session_get_tcp_segment(sess))) + { + mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_tcp_stream, seg); + } + } + else + { + mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_udp, sess); + } + + packet_set_exdata(pkt, sess_mgr->schema->pkt_exdata_idx, sess); + +fast_path: + packet_set_exdata(pkt, sess_mgr->schema->pkt_exdata_idx, NULL); +} + +static void on_packet_output(enum packet_stage stage, struct packet *pkt, void *args) +{ + struct session_manager *sess_mgr = (struct session_manager *)args; + struct session *sess = (struct session *)packet_get_exdata(pkt, sess_mgr->schema->pkt_exdata_idx); + if (sess) + { + enum flow_type type = session_get_flow_type(sess); + int is_ctrl = packet_is_ctrl(pkt); + uint16_t len = packet_get_raw_len(pkt); + switch (packet_get_action(pkt)) + { + case PACKET_ACTION_DROP: + session_inc_stat(sess, type, (is_ctrl ? STAT_CONTROL_PACKETS_DROPPED : STAT_RAW_PACKETS_DROPPED), 1); + session_inc_stat(sess, type, (is_ctrl ? STAT_CONTROL_BYTES_DROPPED : STAT_RAW_BYTES_DROPPED), len); + break; + case PACKET_ACTION_FORWARD: + session_inc_stat(sess, type, (is_ctrl ? STAT_CONTROL_PACKETS_TRANSMITTED : STAT_RAW_PACKETS_TRANSMITTED), 1); + session_inc_stat(sess, type, (is_ctrl ? STAT_CONTROL_BYTES_TRANSMITTED : STAT_RAW_BYTES_TRANSMITTED), len); + break; + default: + assert(0); + break; + } + + session_set_current_packet(sess, NULL); + session_set_flow_type(sess, FLOW_TYPE_NONE); + } +} + +static int on_polling(void *args) +{ + struct session_manager *sess_mgr = (struct session_manager *)args; + struct stellar_module_manager *mod_mgr = sess_mgr->mod_mgr; + int thread_id = stellar_module_manager_get_thread_id(mod_mgr); + struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id]; + uint64_t now_ms = clock_get_real_time_ms(); + +#define MAX_CLEANED_SESS 1024 + struct session *sess = NULL; + struct session *cleaned_sess[MAX_CLEANED_SESS] = {NULL}; + struct exdata_runtime *exdata_rt = NULL; + + uint64_t used = session_manager_runtime_clean_session(sess_mgr_rt, now_ms, cleaned_sess, MAX_CLEANED_SESS); + for (uint64_t j = 0; j < used; j++) + { + sess = cleaned_sess[j]; + exdata_rt = (struct exdata_runtime *)session_get_user_data(sess); + exdata_runtime_free(exdata_rt); + session_manager_runtime_free_session(sess_mgr_rt, sess); + } + + // TODO + // ouput stat to fs4 + session_manager_runtime_print_stat(sess_mgr_rt); + + if (used == MAX_CLEANED_SESS) + { + return 1; + } + else + { + return 0; + } } /****************************************************************************** - * session flow + * session manager schema ******************************************************************************/ -static enum flow_type identify_flow_type_by_port(uint16_t src_port, uint16_t dst_port) -{ - // big port is client - if (src_port > dst_port) - { - return FLOW_TYPE_C2S; - } - else if (src_port < dst_port) - { - return FLOW_TYPE_S2C; - } - else - { - // if port is equal, first packet is C2S - return FLOW_TYPE_C2S; - } -} - -static enum flow_type identify_flow_type_by_history(const struct session *sess, const struct tuple6 *key) -{ - if (tuple6_cmp(session_get_tuple6(sess), key) == 0) - { - return FLOW_TYPE_C2S; - } - else - { - return FLOW_TYPE_S2C; - } +void session_manager_schema_free(struct session_manager_schema *sess_mgr_schema) +{ + if (sess_mgr_schema) + { + if (sess_mgr_schema->mq) + { + mq_schema_destroy_topic(sess_mgr_schema->mq, sess_mgr_schema->topic_id_tcp); + mq_schema_destroy_topic(sess_mgr_schema->mq, sess_mgr_schema->topic_id_udp); + mq_schema_destroy_topic(sess_mgr_schema->mq, sess_mgr_schema->topic_id_ctrl_pkt); + mq_schema_destroy_topic(sess_mgr_schema->mq, sess_mgr_schema->topic_id_tcp_stream); + } + exdata_schema_free(sess_mgr_schema->exdata); + + free(sess_mgr_schema); + sess_mgr_schema = NULL; + } +} + +struct session_manager_schema *session_manager_schema_new(struct packet_manager *pkt_mgr, struct mq_schema *mq, void *subscribe_args) +{ + if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_FORWARD, on_packet_forward, subscribe_args)) + { + SESSION_MANAGER_LOG_ERROR("failed to subscribe PACKET_STAGE_FORWARD"); + return NULL; + } + if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_OUTPUT, on_packet_output, subscribe_args)) + { + SESSION_MANAGER_LOG_ERROR("failed to subscribe PACKET_STAGE_OUTPUT"); + return NULL; + } + + struct session_manager_schema *sess_mgr_schema = calloc(1, sizeof(struct session_manager_schema)); + if (sess_mgr_schema == NULL) + { + SESSION_MANAGER_LOG_ERROR("failed to allocate memory for session_manager_schema"); + return NULL; + } + + sess_mgr_schema->exdata = exdata_schema_new(); + if (sess_mgr_schema->exdata == NULL) + { + SESSION_MANAGER_LOG_ERROR("failed to create exdata_schema"); + goto error_out; + } + + // TODO register polling + + sess_mgr_schema->mq = mq; + sess_mgr_schema->pkt_exdata_idx = packet_manager_new_packet_exdata_index(pkt_mgr, "session_manager", NULL, NULL); + if (sess_mgr_schema->pkt_exdata_idx == -1) + { + SESSION_MANAGER_LOG_ERROR("failed to create packet exdata index"); + goto error_out; + } + + sess_mgr_schema->topic_id_tcp = mq_schema_create_topic(sess_mgr_schema->mq, "TCP", &on_session_dispatch, NULL, NULL, NULL); + if (sess_mgr_schema->topic_id_tcp == -1) + { + SESSION_MANAGER_LOG_ERROR("failed to create topic TCP"); + goto error_out; + } + sess_mgr_schema->topic_id_udp = mq_schema_create_topic(sess_mgr_schema->mq, "UDP", &on_session_dispatch, NULL, NULL, NULL); + if (sess_mgr_schema->topic_id_udp == -1) + { + SESSION_MANAGER_LOG_ERROR("failed to create topic UDP"); + goto error_out; + } + sess_mgr_schema->topic_id_ctrl_pkt = mq_schema_create_topic(sess_mgr_schema->mq, "CTRL_PKT", &on_session_dispatch, NULL, NULL, NULL); + if (sess_mgr_schema->topic_id_ctrl_pkt == -1) + { + SESSION_MANAGER_LOG_ERROR("failed to create topic CTRL_PKT"); + goto error_out; + } + sess_mgr_schema->topic_id_tcp_stream = mq_schema_create_topic(sess_mgr_schema->mq, "TCP_STREAM", &on_tcp_stream_dispatch, NULL, &on_tcp_stream_free, NULL); + if (sess_mgr_schema->topic_id_tcp_stream == -1) + { + SESSION_MANAGER_LOG_ERROR("failed to create topic TCP_STREAM"); + goto error_out; + } + + return sess_mgr_schema; + +error_out: + session_manager_schema_free(sess_mgr_schema); + return NULL; } /****************************************************************************** - * bypass packet -- table limit / session evicted / duplicated packet + * session manager ******************************************************************************/ -static int session_manager_runtime_bypass_packet_on_tcp_table_limit(struct session_manager_runtime *sess_mgr_rt, const struct tuple6 *key) +void session_manager_free(struct session_manager *sess_mgr) { - if (key->ip_proto == IPPROTO_TCP && sess_mgr_rt->stat.tcp_sess_used >= sess_mgr_rt->cfg.tcp_session_max) - { - sess_mgr_rt->stat.tcp_pkts_bypass_table_full++; - return 1; - } - return 0; -} + if (sess_mgr) + { + for (int i = 0; i < sess_mgr->thread_num; i++) + { + session_manager_runtime_free(sess_mgr->runtime[i]); + } -static int session_manager_runtime_bypass_packet_on_udp_table_limit(struct session_manager_runtime *sess_mgr_rt, const struct tuple6 *key) -{ - if (key->ip_proto == IPPROTO_UDP && sess_mgr_rt->stat.udp_sess_used >= sess_mgr_rt->cfg.udp_session_max) - { - sess_mgr_rt->stat.udp_pkts_bypass_table_full++; - return 1; - } - return 0; + session_manager_schema_free(sess_mgr->schema); + session_manager_config_free(sess_mgr->cfg); + free(sess_mgr); + } } -static int session_manager_runtime_bypass_packet_on_session_evicted(struct session_manager_runtime *sess_mgr_rt, const struct tuple6 *key) +struct session_manager *session_manager_new(struct packet_manager *pkt_mgr, struct mq_schema *mq_schema, const char *toml_file) { - if (sess_mgr_rt->cfg.evicted_session_bloom_filter.enable && session_filter_lookup(sess_mgr_rt->evicte_sess_filter, key, sess_mgr_rt->now_ms)) - { - sess_mgr_rt->stat.udp_pkts_bypass_session_evicted++; - return 1; - } - - return 0; -} - -static int session_manager_runtime_bypass_duplicated_packet(struct session_manager_runtime *sess_mgr_rt, struct session *sess, const struct packet *pkt, const struct tuple6 *key) -{ - if (sess_mgr_rt->cfg.duplicated_packet_bloom_filter.enable == 0) - { - return 0; - } - - enum flow_type type = identify_flow_type_by_history(sess, key); - if (session_get_stat(sess, type, STAT_RAW_PACKETS_RECEIVED) < 3 || session_has_duplicate_traffic(sess)) - { - if (packet_filter_lookup(sess_mgr_rt->dup_pkt_filter, pkt, sess_mgr_rt->now_ms)) - { - session_inc_stat(sess, type, STAT_DUPLICATE_PACKETS_BYPASS, 1); - session_inc_stat(sess, type, STAT_DUPLICATE_BYTES_BYPASS, packet_get_raw_len(pkt)); - switch (session_get_type(sess)) - { - case SESSION_TYPE_TCP: - sess_mgr_rt->stat.tcp_pkts_bypass_duplicated++; - break; - case SESSION_TYPE_UDP: - sess_mgr_rt->stat.udp_pkts_bypass_duplicated++; - break; - default: - assert(0); - break; - } - session_set_duplicate_traffic(sess); - - session_set_current_packet(sess, pkt); - session_set_flow_type(sess, type); - return 1; - } - else - { - packet_filter_add(sess_mgr_rt->dup_pkt_filter, pkt, sess_mgr_rt->now_ms); - return 0; - } - } - - return 0; -} - -void session_manager_runtime_record_duplicated_packet(struct session_manager_runtime *sess_mgr_rt, const struct packet *pkt) -{ - if (sess_mgr_rt->cfg.duplicated_packet_bloom_filter.enable) - { - packet_filter_add(sess_mgr_rt->dup_pkt_filter, pkt, sess_mgr_rt->now_ms); - } -} - -/****************************************************************************** - * config -- new / free / print - ******************************************************************************/ - -struct session_manager_config *session_manager_config_new(const char *toml_file) -{ - if (toml_file == NULL) - { - return NULL; - } - - struct session_manager_config *sess_mgr_cfg = (struct session_manager_config *)calloc(1, sizeof(struct session_manager_config)); - if (sess_mgr_cfg == NULL) - { - return NULL; - } - - int ret = 0; - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_session_max", (uint64_t *)&sess_mgr_cfg->tcp_session_max, EVICTE_SESSION_BURST * 2, UINT64_MAX); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.udp_session_max", (uint64_t *)&sess_mgr_cfg->udp_session_max, EVICTE_SESSION_BURST * 2, UINT64_MAX); + uint16_t thread_num; + uint64_t instance_id; + uint64_t now_ms = clock_get_real_time_ms(); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evict_old_on_tcp_table_limit", (uint64_t *)&sess_mgr_cfg->evict_old_on_tcp_table_limit, 0, 1); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evict_old_on_udp_table_limit", (uint64_t *)&sess_mgr_cfg->evict_old_on_udp_table_limit, 0, 1); + if (load_and_validate_toml_integer_config(toml_file, "instance.id", (uint64_t *)&instance_id, 0, 4095)) + { + return NULL; + } + if (load_and_validate_toml_integer_config(toml_file, "packet.nr_worker_thread", (uint64_t *)&thread_num, 0, MAX_THREAD_NUM)) + { + return NULL; + } - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.expire_period_ms", (uint64_t *)&sess_mgr_cfg->expire_period_ms, 0, 60000); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.expire_batch_max", (uint64_t *)&sess_mgr_cfg->expire_batch_max, 1, 1024); + struct session_manager *sess_mgr = calloc(1, sizeof(struct session_manager)); + if (sess_mgr == NULL) + { + SESSION_MANAGER_LOG_ERROR("failed to allocate memory for session_manager"); + return NULL; + } - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.init", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.init, 1, 60000); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.handshake", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.handshake, 1, 60000); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.data", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.data, 1, 15999999000); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.half_closed", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.half_closed, 1, 604800000); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.time_wait", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.time_wait, 1, 60000); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.discard_default", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.discard_default, 1, 15999999000); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.unverified_rst", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.unverified_rst, 1, 60000); + sess_mgr->cfg = session_manager_config_new(toml_file); + if (sess_mgr->cfg == NULL) + { + SESSION_MANAGER_LOG_ERROR("failed to create session_manager_config"); + goto error_out; + } - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.udp_timeout_ms.data", (uint64_t *)&sess_mgr_cfg->udp_timeout_ms.data, 1, 15999999000); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.udp_timeout_ms.discard_default", (uint64_t *)&sess_mgr_cfg->udp_timeout_ms.discard_default, 1, 15999999000); + sess_mgr->schema = session_manager_schema_new(pkt_mgr, mq_schema, sess_mgr); + if (sess_mgr->schema == NULL) + { + goto error_out; + } - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.duplicated_packet_bloom_filter.enable", (uint64_t *)&sess_mgr_cfg->duplicated_packet_bloom_filter.enable, 0, 1); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.duplicated_packet_bloom_filter.capacity", (uint64_t *)&sess_mgr_cfg->duplicated_packet_bloom_filter.capacity, 1, 4294967295); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.duplicated_packet_bloom_filter.time_window_ms", (uint64_t *)&sess_mgr_cfg->duplicated_packet_bloom_filter.time_window_ms, 1, 60000); - ret += load_and_validate_toml_double_config(toml_file, "session_manager.duplicated_packet_bloom_filter.error_rate", (double *)&sess_mgr_cfg->duplicated_packet_bloom_filter.error_rate, 0.0, 1.0); + sess_mgr->thread_num = thread_num; + for (int i = 0; i < sess_mgr->thread_num; i++) + { + sess_mgr->cfg->session_id_seed = instance_id << 8 | i; + sess_mgr->runtime[i] = session_manager_runtime_new(sess_mgr->cfg, now_ms); + if (sess_mgr->runtime[i] == NULL) + { + SESSION_MANAGER_LOG_ERROR("failed to create session_manager_runtime"); + goto error_out; + } + } - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evicted_session_bloom_filter.enable", (uint64_t *)&sess_mgr_cfg->evicted_session_bloom_filter.enable, 0, 1); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evicted_session_bloom_filter.capacity", (uint64_t *)&sess_mgr_cfg->evicted_session_bloom_filter.capacity, 1, 4294967295); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evicted_session_bloom_filter.time_window_ms", (uint64_t *)&sess_mgr_cfg->evicted_session_bloom_filter.time_window_ms, 1, 60000); - ret += load_and_validate_toml_double_config(toml_file, "session_manager.evicted_session_bloom_filter.error_rate", (double *)&sess_mgr_cfg->evicted_session_bloom_filter.error_rate, 0.0, 1.0); + return sess_mgr; - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_reassembly.enable", (uint64_t *)&sess_mgr_cfg->tcp_reassembly.enable, 0, 1); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_reassembly.timeout_ms", (uint64_t *)&sess_mgr_cfg->tcp_reassembly.timeout_ms, 1, 60000); - ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_reassembly.buffered_segments_max", (uint64_t *)&sess_mgr_cfg->tcp_reassembly.buffered_segments_max, 1, 512); - - if (ret != 0) - { - session_manager_config_free(sess_mgr_cfg); - return NULL; - } - - return sess_mgr_cfg; +error_out: + session_manager_free(sess_mgr); + return NULL; } -void session_manager_config_free(struct session_manager_config *sess_mgr_cfg) +int session_manager_new_packet_exdata_index(struct session_manager *sess_mgr, const char *name, exdata_free *func, void *arg) { - if (sess_mgr_cfg) - { - free(sess_mgr_cfg); - sess_mgr_cfg = NULL; - } + return exdata_schema_new_index(sess_mgr->schema->exdata, name, func, arg); } -void session_manager_config_print(struct session_manager_config *sess_mgr_cfg) +int session_manager_subscribe_tcp(struct session_manager *sess_mgr, on_session_callback *cb, void *args) { - if (sess_mgr_cfg) - { - // max session number - SESSION_LOG_INFO("session_manager.tcp_session_max : %lu", sess_mgr_cfg->tcp_session_max); - SESSION_LOG_INFO("session_manager.udp_session_max : %lu", sess_mgr_cfg->udp_session_max); - - // session overload - SESSION_LOG_INFO("session_manager.evict_old_on_tcp_table_limit : %d", sess_mgr_cfg->evict_old_on_tcp_table_limit); - SESSION_LOG_INFO("session_manager.evict_old_on_udp_table_limit : %d", sess_mgr_cfg->evict_old_on_udp_table_limit); - - // TCP timeout - SESSION_LOG_INFO("session_manager.tcp_timeout_ms.init : %lu", sess_mgr_cfg->tcp_timeout_ms.init); - SESSION_LOG_INFO("session_manager.tcp_timeout_ms.handshake : %lu", sess_mgr_cfg->tcp_timeout_ms.handshake); - SESSION_LOG_INFO("session_manager.tcp_timeout_ms.data : %lu", sess_mgr_cfg->tcp_timeout_ms.data); - SESSION_LOG_INFO("session_manager.tcp_timeout_ms.half_closed : %lu", sess_mgr_cfg->tcp_timeout_ms.half_closed); - SESSION_LOG_INFO("session_manager.tcp_timeout_ms.time_wait : %lu", sess_mgr_cfg->tcp_timeout_ms.time_wait); - SESSION_LOG_INFO("session_manager.tcp_timeout_ms.discard_default : %lu", sess_mgr_cfg->tcp_timeout_ms.discard_default); - SESSION_LOG_INFO("session_manager.tcp_timeout_ms.unverified_rst : %lu", sess_mgr_cfg->tcp_timeout_ms.unverified_rst); - - // UDP timeout - SESSION_LOG_INFO("session_manager.udp_timeout_ms.data : %lu", sess_mgr_cfg->udp_timeout_ms.data); - SESSION_LOG_INFO("session_manager.udp_timeout_ms.discard_default : %lu", sess_mgr_cfg->udp_timeout_ms.discard_default); - - // limit - SESSION_LOG_INFO("session_manager.expire_period_ms : %lu", sess_mgr_cfg->expire_period_ms); - SESSION_LOG_INFO("session_manager.expire_batch_max : %lu", sess_mgr_cfg->expire_batch_max); - - // duplicated packet filter - SESSION_LOG_INFO("session_manager.duplicated_packet_bloom_filter.enable : %d", sess_mgr_cfg->duplicated_packet_bloom_filter.enable); - SESSION_LOG_INFO("session_manager.duplicated_packet_bloom_filter.capacity : %lu", sess_mgr_cfg->duplicated_packet_bloom_filter.capacity); - SESSION_LOG_INFO("session_manager.duplicated_packet_bloom_filter.time_window_ms : %lu", sess_mgr_cfg->duplicated_packet_bloom_filter.time_window_ms); - SESSION_LOG_INFO("session_manager.duplicated_packet_bloom_filter.error_rate : %f", sess_mgr_cfg->duplicated_packet_bloom_filter.error_rate); - - // eviction session filter - SESSION_LOG_INFO("session_manager.evicted_session_bloom_filter.enable : %d", sess_mgr_cfg->evicted_session_bloom_filter.enable); - SESSION_LOG_INFO("session_manager.evicted_session_bloom_filter.capacity : %lu", sess_mgr_cfg->evicted_session_bloom_filter.capacity); - SESSION_LOG_INFO("session_manager.evicted_session_bloom_filter.time_window_ms : %lu", sess_mgr_cfg->evicted_session_bloom_filter.time_window_ms); - SESSION_LOG_INFO("session_manager.evicted_session_bloom_filter.error_rate : %f", sess_mgr_cfg->evicted_session_bloom_filter.error_rate); - - // TCP reassembly - SESSION_LOG_INFO("session_manager.tcp_reassembly.enable : %d", sess_mgr_cfg->tcp_reassembly.enable); - SESSION_LOG_INFO("session_manager.tcp_reassembly.timeout_ms : %lu", sess_mgr_cfg->tcp_reassembly.timeout_ms); - SESSION_LOG_INFO("session_manager.tcp_reassembly.buffered_segments_max : %lu", sess_mgr_cfg->tcp_reassembly.buffered_segments_max); - } + return mq_schema_subscribe(sess_mgr->schema->mq, sess_mgr->schema->topic_id_tcp, (on_msg_cb_func *)(void *)cb, args); } -/****************************************************************************** - * runtime -- new / free - ******************************************************************************/ - -struct session_manager_runtime *session_manager_runtime_new(const struct session_manager_config *sess_mgr_cfg, uint64_t now_ms) +int session_manager_subscribe_udp(struct session_manager *sess_mgr, on_session_callback *cb, void *args) { - struct session_manager_runtime *sess_mgr_rt = (struct session_manager_runtime *)calloc(1, sizeof(struct session_manager_runtime)); - if (sess_mgr_rt == NULL) - { - return NULL; - } - memcpy(&sess_mgr_rt->cfg, sess_mgr_cfg, sizeof(struct session_manager_config)); - - sess_mgr_rt->sess_pool = session_pool_new(sess_mgr_rt->cfg.tcp_session_max + sess_mgr_rt->cfg.udp_session_max); - sess_mgr_rt->tcp_sess_table = session_table_new(); - sess_mgr_rt->udp_sess_table = session_table_new(); - sess_mgr_rt->sess_timer = session_timer_new(now_ms); - if (sess_mgr_rt->sess_pool == NULL || sess_mgr_rt->tcp_sess_table == NULL || sess_mgr_rt->udp_sess_table == NULL || sess_mgr_rt->sess_timer == NULL) - { - goto error; - } - if (sess_mgr_rt->cfg.evicted_session_bloom_filter.enable) - { - sess_mgr_rt->evicte_sess_filter = session_filter_new(sess_mgr_rt->cfg.evicted_session_bloom_filter.capacity, - sess_mgr_rt->cfg.evicted_session_bloom_filter.time_window_ms, - sess_mgr_rt->cfg.evicted_session_bloom_filter.error_rate, now_ms); - if (sess_mgr_rt->evicte_sess_filter == NULL) - { - goto error; - } - } - if (sess_mgr_rt->cfg.duplicated_packet_bloom_filter.enable) - { - sess_mgr_rt->dup_pkt_filter = packet_filter_new(sess_mgr_rt->cfg.duplicated_packet_bloom_filter.capacity, - sess_mgr_rt->cfg.duplicated_packet_bloom_filter.time_window_ms, - sess_mgr_rt->cfg.duplicated_packet_bloom_filter.error_rate, now_ms); - if (sess_mgr_rt->dup_pkt_filter == NULL) - { - goto error; - } - } - sess_mgr_rt->sf = snowflake_new(sess_mgr_rt->cfg.session_id_seed); - if (sess_mgr_rt->sf == NULL) - { - goto error; - } - - TAILQ_INIT(&sess_mgr_rt->evicte_list); - session_transition_init(); - sess_mgr_rt->now_ms = now_ms; - sess_mgr_rt->last_clean_expired_sess_ts = now_ms; - - return sess_mgr_rt; - -error: - session_manager_runtime_free(sess_mgr_rt); - return NULL; + return mq_schema_subscribe(sess_mgr->schema->mq, sess_mgr->schema->topic_id_udp, (on_msg_cb_func *)(void *)cb, args); } -void session_manager_runtime_free(struct session_manager_runtime *sess_mgr_rt) +int session_manager_subscribe_control_packet(struct session_manager *sess_mgr, on_session_callback *cb, void *args) { - struct session *sess; - if (sess_mgr_rt) - { - // free all evicted session - while ((sess = TAILQ_FIRST(&sess_mgr_rt->evicte_list))) - { - TAILQ_REMOVE(&sess_mgr_rt->evicte_list, sess, evicte_tqe); - session_manager_runtime_free_session(sess_mgr_rt, sess); - } - // free all udp session - while (sess_mgr_rt->udp_sess_table && (sess = session_table_find_lru(sess_mgr_rt->udp_sess_table))) - { - session_manager_runtime_free_session(sess_mgr_rt, sess); - } - // free all tcp session - while (sess_mgr_rt->tcp_sess_table && (sess = session_table_find_lru(sess_mgr_rt->tcp_sess_table))) - { - session_manager_runtime_free_session(sess_mgr_rt, sess); - } - if (sess_mgr_rt->cfg.evicted_session_bloom_filter.enable) - { - session_filter_free(sess_mgr_rt->evicte_sess_filter); - } - if (sess_mgr_rt->cfg.duplicated_packet_bloom_filter.enable) - { - packet_filter_free(sess_mgr_rt->dup_pkt_filter); - } - snowflake_free(sess_mgr_rt->sf); - session_timer_free(sess_mgr_rt->sess_timer); - session_table_free(sess_mgr_rt->udp_sess_table); - session_table_free(sess_mgr_rt->tcp_sess_table); - session_pool_free(sess_mgr_rt->sess_pool); - free(sess_mgr_rt); - sess_mgr_rt = NULL; - } + return mq_schema_subscribe(sess_mgr->schema->mq, sess_mgr->schema->topic_id_ctrl_pkt, (on_msg_cb_func *)(void *)cb, args); } -/****************************************************************************** - * session -- new / free / lookup / updata / expire / evicte / clean - ******************************************************************************/ - -static void session_update(struct session_manager_runtime *sess_mgr_rt, struct session *sess, enum session_state next_state, const struct packet *pkt, const struct tuple6 *key, enum flow_type type) +int session_manager_subscribe_tcp_stream(struct session_manager *sess_mgr, on_tcp_stream_callback *cb, void *args) { - if (session_get_current_state(sess) == SESSION_STATE_INIT) - { - uint64_t sess_id = snowflake_generate(sess_mgr_rt->sf, sess_mgr_rt->now_ms / 1000); - session_set_id(sess, sess_id); - enum packet_direction pkt_dir = packet_get_direction(pkt); - if (type == FLOW_TYPE_C2S) - { - session_set_tuple6(sess, key); - if (pkt_dir == PACKET_DIRECTION_OUTGOING) // Internal -> External - { - session_set_direction(sess, SESSION_DIRECTION_OUTBOUND); - } - else - { - session_set_direction(sess, SESSION_DIRECTION_INBOUND); - } - tuple6_to_str(key, sess->tuple_str, sizeof(sess->tuple_str)); - } - else - { - struct tuple6 out; - tuple6_reverse(key, &out); - session_set_tuple6(sess, &out); - if (pkt_dir == PACKET_DIRECTION_OUTGOING) // Internal -> External - { - session_set_direction(sess, SESSION_DIRECTION_INBOUND); - } - else - { - session_set_direction(sess, SESSION_DIRECTION_OUTBOUND); - } - tuple6_to_str(&out, sess->tuple_str, sizeof(sess->tuple_str)); - } + assert(sess_mgr); - session_set_timestamp(sess, SESSION_TIMESTAMP_START, sess_mgr_rt->now_ms); - switch (key->ip_proto) - { - case IPPROTO_TCP: - session_set_type(sess, SESSION_TYPE_TCP); - break; - case IPPROTO_UDP: - session_set_type(sess, SESSION_TYPE_UDP); - break; - default: - assert(0); - break; - } - } - - session_inc_stat(sess, type, STAT_RAW_PACKETS_RECEIVED, 1); - session_inc_stat(sess, type, STAT_RAW_BYTES_RECEIVED, packet_get_raw_len(pkt)); - - if (!session_get_first_packet(sess, type)) - { - session_set_first_packet(sess, type, packet_dup(pkt)); - session_set_route_ctx(sess, type, packet_get_route_ctx(pkt)); - session_set_sids(sess, type, packet_get_sids(pkt)); - } - - session_set_current_packet(sess, pkt); - session_set_flow_type(sess, type); - session_set_timestamp(sess, SESSION_TIMESTAMP_LAST, sess_mgr_rt->now_ms); - session_set_current_state(sess, next_state); -} - -static void session_manager_runtime_evicte_session(struct session_manager_runtime *sess_mgr_rt, struct session *sess, int reason) -{ - if (sess == NULL) - { - return; - } - - // when session add to evicted queue, session lifetime is over - enum session_state curr_state = session_get_current_state(sess); - enum session_state next_state = session_transition_run(curr_state, reason); - session_transition_log(sess, curr_state, next_state, reason); - session_set_current_state(sess, next_state); - if (!session_get_closing_reason(sess)) - { - if (reason == PORT_REUSE_EVICT) - { - session_set_closing_reason(sess, CLOSING_BY_PORT_REUSE_EVICTED); - } - if (reason == LRU_EVICT) - { - session_set_closing_reason(sess, CLOSING_BY_LRU_EVICTED); - } - } - session_timer_del(sess_mgr_rt->sess_timer, sess); - TAILQ_INSERT_TAIL(&sess_mgr_rt->evicte_list, sess, evicte_tqe); - - switch (session_get_type(sess)) - { - case SESSION_TYPE_TCP: - SESSION_LOG_DEBUG("evicte tcp old session: %lu", session_get_id(sess)); - session_table_del(sess_mgr_rt->tcp_sess_table, sess); - SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, tcp); - sess_mgr_rt->stat.tcp_sess_evicted++; - break; - case SESSION_TYPE_UDP: - SESSION_LOG_DEBUG("evicte udp old session: %lu", session_get_id(sess)); - session_table_del(sess_mgr_rt->udp_sess_table, sess); - if (sess_mgr_rt->cfg.evicted_session_bloom_filter.enable) - { - session_filter_add(sess_mgr_rt->evicte_sess_filter, session_get_tuple6(sess), sess_mgr_rt->now_ms); - } - SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, udp); - sess_mgr_rt->stat.udp_sess_evicted++; - break; - default: - assert(0); - break; - } -} - -static struct session *session_manager_runtime_lookup_tcp_session(struct session_manager_runtime *sess_mgr_rt, const struct packet *pkt, const struct tuple6 *key) -{ - struct session *sess = session_table_find_tuple6(sess_mgr_rt->tcp_sess_table, key, 0); - if (sess == NULL) - { - return NULL; - } - - const struct layer_private *tcp_layer = packet_get_innermost_layer(pkt, LAYER_PROTO_TCP); - const struct tcphdr *hdr = (const struct tcphdr *)tcp_layer->hdr_ptr; - uint8_t flags = tcp_hdr_get_flags(hdr); - if ((flags & TH_SYN) == 0) - { - return sess; - } - - enum flow_type type = identify_flow_type_by_history(sess, key); - struct tcp_half *half = &sess->tcp_halfs[type]; - if ((half->isn && half->isn != tcp_hdr_get_seq(hdr)) || // recv SYN with different ISN - ((half->history & TH_FIN) || (half->history & TH_RST))) // recv SYN after FIN or RST - { - // TCP port reuse, evict old session - session_manager_runtime_evicte_session(sess_mgr_rt, sess, PORT_REUSE_EVICT); - return NULL; - } - else - { - // TCP SYN retransmission - return sess; - } -} - -static struct session *session_manager_runtime_new_tcp_session(struct session_manager_runtime *sess_mgr_rt, const struct packet *pkt, const struct tuple6 *key) -{ - const struct layer_private *tcp_layer = packet_get_innermost_layer(pkt, LAYER_PROTO_TCP); - const struct tcphdr *hdr = (const struct tcphdr *)tcp_layer->hdr_ptr; - uint8_t flags = tcp_hdr_get_flags(hdr); - if (!(flags & TH_SYN)) - { - sess_mgr_rt->stat.tcp_pkts_bypass_session_not_found++; - return NULL; - } - - // tcp table full evict old session - if (sess_mgr_rt->cfg.evict_old_on_tcp_table_limit && sess_mgr_rt->stat.tcp_sess_used >= sess_mgr_rt->cfg.tcp_session_max - EVICTE_SESSION_BURST) - { - struct session *evic_sess = session_table_find_lru(sess_mgr_rt->tcp_sess_table); - session_manager_runtime_evicte_session(sess_mgr_rt, evic_sess, LRU_EVICT); - } - - enum flow_type type = (flags & TH_ACK) ? FLOW_TYPE_S2C : FLOW_TYPE_C2S; - struct session *sess = session_pool_pop(sess_mgr_rt->sess_pool); - if (sess == NULL) - { - assert(0); - return NULL; - } - session_init(sess); - sess->sess_mgr_rt = sess_mgr_rt; - sess->sess_mgr_stat = &sess_mgr_rt->stat; - - enum session_state next_state = session_transition_run(SESSION_STATE_INIT, TCP_SYN); - session_update(sess_mgr_rt, sess, next_state, pkt, key, type); - session_transition_log(sess, SESSION_STATE_INIT, next_state, TCP_SYN); - - if (tcp_init(sess_mgr_rt, sess) == -1) - { - assert(0); - session_pool_push(sess_mgr_rt->sess_pool, sess); - return NULL; - } - tcp_update(sess_mgr_rt, sess, type, tcp_layer); - - uint64_t timeout = (flags & TH_ACK) ? sess_mgr_rt->cfg.tcp_timeout_ms.handshake : sess_mgr_rt->cfg.tcp_timeout_ms.init; - session_timer_update(sess_mgr_rt->sess_timer, sess, sess_mgr_rt->now_ms + timeout); - session_table_add(sess_mgr_rt->tcp_sess_table, sess); - - if (sess_mgr_rt->cfg.duplicated_packet_bloom_filter.enable) - { - packet_filter_add(sess_mgr_rt->dup_pkt_filter, pkt, sess_mgr_rt->now_ms); - } - - SESS_MGR_STAT_INC(&sess_mgr_rt->stat, next_state, tcp); - sess_mgr_rt->stat.tcp_sess_used++; - sess_mgr_rt->stat.history_tcp_sessions++; - - return sess; -} - -static struct session *session_manager_runtime_new_udp_session(struct session_manager_runtime *sess_mgr_rt, const struct packet *pkt, const struct tuple6 *key) -{ - // udp table full evict old session - if (sess_mgr_rt->cfg.evict_old_on_udp_table_limit && sess_mgr_rt->stat.udp_sess_used >= sess_mgr_rt->cfg.udp_session_max - EVICTE_SESSION_BURST) - { - struct session *evic_sess = session_table_find_lru(sess_mgr_rt->udp_sess_table); - session_manager_runtime_evicte_session(sess_mgr_rt, evic_sess, LRU_EVICT); - } - - struct session *sess = session_pool_pop(sess_mgr_rt->sess_pool); - if (sess == NULL) - { - assert(sess); - return NULL; - } - session_init(sess); - sess->sess_mgr_rt = sess_mgr_rt; - sess->sess_mgr_stat = &sess_mgr_rt->stat; - - enum flow_type type = identify_flow_type_by_port(ntohs(key->src_port), ntohs(key->dst_port)); - enum session_state next_state = session_transition_run(SESSION_STATE_INIT, UDP_DATA); - session_update(sess_mgr_rt, sess, next_state, pkt, key, type); - session_transition_log(sess, SESSION_STATE_INIT, next_state, UDP_DATA); - - session_timer_update(sess_mgr_rt->sess_timer, sess, sess_mgr_rt->now_ms + sess_mgr_rt->cfg.udp_timeout_ms.data); - session_table_add(sess_mgr_rt->udp_sess_table, sess); - - SESS_MGR_STAT_INC(&sess_mgr_rt->stat, next_state, udp); - sess_mgr_rt->stat.udp_sess_used++; - sess_mgr_rt->stat.history_udp_sessions++; - - return sess; -} - -static int session_manager_runtime_update_tcp_session(struct session_manager_runtime *sess_mgr_rt, struct session *sess, const struct packet *pkt, const struct tuple6 *key) -{ - const struct layer_private *tcp_layer = packet_get_innermost_layer(pkt, LAYER_PROTO_TCP); - const struct tcphdr *hdr = (const struct tcphdr *)tcp_layer->hdr_ptr; - enum flow_type type = identify_flow_type_by_history(sess, key); - uint8_t flags = tcp_hdr_get_flags(hdr); - int inputs = 0; - inputs |= (flags & TH_SYN) ? TCP_SYN : NONE; - inputs |= (flags & TH_FIN) ? TCP_FIN : NONE; - inputs |= (flags & TH_RST) ? TCP_RST : NONE; - inputs |= tcp_layer->pld_len ? TCP_DATA : NONE; - - // update state - enum session_state curr_state = session_get_current_state(sess); - enum session_state next_state = session_transition_run(curr_state, inputs); - - // update session - session_update(sess_mgr_rt, sess, next_state, pkt, key, type); - session_transition_log(sess, curr_state, next_state, inputs); - - // update tcp - tcp_update(sess_mgr_rt, sess, type, tcp_layer); - - // set closing reason - if (next_state == SESSION_STATE_CLOSING && !session_get_closing_reason(sess)) - { - if (flags & TH_FIN) - { - session_set_closing_reason(sess, (type == FLOW_TYPE_C2S ? CLOSING_BY_CLIENT_FIN : CLOSING_BY_SERVER_FIN)); - } - if (flags & TH_RST) - { - session_set_closing_reason(sess, (type == FLOW_TYPE_C2S ? CLOSING_BY_CLIENT_RST : CLOSING_BY_SERVER_RST)); - } - } - - // update timeout - struct tcp_half *curr = &sess->tcp_halfs[type]; - struct tcp_half *peer = &sess->tcp_halfs[(type == FLOW_TYPE_C2S ? FLOW_TYPE_S2C : FLOW_TYPE_C2S)]; - uint64_t timeout = 0; - switch (next_state) - { - case SESSION_STATE_OPENING: - if (flags & TH_SYN) - { - timeout = (flags & TH_ACK) ? sess_mgr_rt->cfg.tcp_timeout_ms.handshake : sess_mgr_rt->cfg.tcp_timeout_ms.init; - } - else - { - timeout = sess_mgr_rt->cfg.tcp_timeout_ms.data; - } - break; - case SESSION_STATE_ACTIVE: - timeout = sess_mgr_rt->cfg.tcp_timeout_ms.data; - break; - case SESSION_STATE_CLOSING: - if (flags & TH_FIN) - { - timeout = (peer->history & TH_FIN) ? sess_mgr_rt->cfg.tcp_timeout_ms.time_wait : sess_mgr_rt->cfg.tcp_timeout_ms.half_closed; - } - else if (flags & TH_RST) - { - // if fin is received, the expected sequence number should be increased by 1 - uint32_t expected = (peer->history & TH_FIN) ? peer->ack + 1 : peer->ack; - timeout = (expected == curr->seq) ? sess_mgr_rt->cfg.tcp_timeout_ms.time_wait : sess_mgr_rt->cfg.tcp_timeout_ms.unverified_rst; - } - else - { - timeout = sess_mgr_rt->cfg.tcp_timeout_ms.data; - } - break; - case SESSION_STATE_DISCARD: - timeout = sess_mgr_rt->cfg.tcp_timeout_ms.discard_default; - break; - default: - assert(0); - break; - } - session_timer_update(sess_mgr_rt->sess_timer, sess, sess_mgr_rt->now_ms + timeout); - - SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, tcp); - - return 0; -} - -static int session_manager_runtime_update_udp_session(struct session_manager_runtime *sess_mgr_rt, struct session *sess, const struct packet *pkt, const struct tuple6 *key) -{ - enum flow_type type = identify_flow_type_by_history(sess, key); - enum session_state curr_state = session_get_current_state(sess); - enum session_state next_state = session_transition_run(curr_state, UDP_DATA); - session_update(sess_mgr_rt, sess, next_state, pkt, key, type); - session_transition_log(sess, curr_state, next_state, UDP_DATA); - - if (session_get_current_state(sess) == SESSION_STATE_DISCARD) - { - session_timer_update(sess_mgr_rt->sess_timer, sess, sess_mgr_rt->now_ms + sess_mgr_rt->cfg.udp_timeout_ms.discard_default); - } - else - { - session_timer_update(sess_mgr_rt->sess_timer, sess, sess_mgr_rt->now_ms + sess_mgr_rt->cfg.udp_timeout_ms.data); - } - - SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, udp); - - return 0; -} - -struct session *session_manager_runtime_new_session(struct session_manager_runtime *sess_mgr_rt, const struct packet *pkt, uint64_t now_ms) -{ - sess_mgr_rt->now_ms = now_ms; - - struct tuple6 key; - if (packet_get_innermost_tuple6(pkt, &key)) - { - return NULL; - } - switch (key.ip_proto) - { - case IPPROTO_TCP: - if (session_manager_runtime_bypass_packet_on_tcp_table_limit(sess_mgr_rt, &key)) - { - return NULL; - } - return session_manager_runtime_new_tcp_session(sess_mgr_rt, pkt, &key); - case IPPROTO_UDP: - if (session_manager_runtime_bypass_packet_on_session_evicted(sess_mgr_rt, &key)) - { - return NULL; - } - if (session_manager_runtime_bypass_packet_on_udp_table_limit(sess_mgr_rt, &key)) - { - return NULL; - } - return session_manager_runtime_new_udp_session(sess_mgr_rt, pkt, &key); - default: - return NULL; - } -} - -void session_manager_runtime_free_session(struct session_manager_runtime *sess_mgr_rt, struct session *sess) -{ - if (sess) - { - SESSION_LOG_DEBUG("session %lu closed (%s)", session_get_id(sess), closing_reason_to_str(session_get_closing_reason(sess))); - - session_timer_del(sess_mgr_rt->sess_timer, sess); - switch (session_get_type(sess)) - { - case SESSION_TYPE_TCP: - tcp_clean(sess_mgr_rt, sess); - if (session_table_find_sessid(sess_mgr_rt->tcp_sess_table, session_get_id(sess), 0) == sess) - { - session_table_del(sess_mgr_rt->tcp_sess_table, sess); - } - SESS_MGR_STAT_DEC(&sess_mgr_rt->stat, session_get_current_state(sess), tcp); - sess_mgr_rt->stat.tcp_sess_used--; - break; - case SESSION_TYPE_UDP: - if (session_table_find_sessid(sess_mgr_rt->udp_sess_table, session_get_id(sess), 0) == sess) - { - session_table_del(sess_mgr_rt->udp_sess_table, sess); - } - SESS_MGR_STAT_DEC(&sess_mgr_rt->stat, session_get_current_state(sess), udp); - sess_mgr_rt->stat.udp_sess_used--; - break; - default: - assert(0); - break; - } - - packet_free((struct packet *)session_get_first_packet(sess, FLOW_TYPE_C2S)); - packet_free((struct packet *)session_get_first_packet(sess, FLOW_TYPE_S2C)); - session_set_first_packet(sess, FLOW_TYPE_C2S, NULL); - session_set_first_packet(sess, FLOW_TYPE_S2C, NULL); - session_clear_route_ctx(sess, FLOW_TYPE_C2S); - session_clear_route_ctx(sess, FLOW_TYPE_S2C); - session_clear_sids(sess, FLOW_TYPE_C2S); - session_clear_sids(sess, FLOW_TYPE_S2C); - session_set_current_state(sess, SESSION_STATE_INIT); - session_set_current_packet(sess, NULL); - session_set_flow_type(sess, FLOW_TYPE_NONE); - session_pool_push(sess_mgr_rt->sess_pool, sess); - sess = NULL; - } -} - -struct session *session_manager_runtime_lookup_session_by_packet(struct session_manager_runtime *sess_mgr_rt, const struct packet *pkt) -{ - struct tuple6 key; - if (packet_get_innermost_tuple6(pkt, &key)) - { - return NULL; - } - switch (key.ip_proto) - { - case IPPROTO_UDP: - return session_table_find_tuple6(sess_mgr_rt->udp_sess_table, &key, 0); - case IPPROTO_TCP: - return session_manager_runtime_lookup_tcp_session(sess_mgr_rt, pkt, &key); - default: - return NULL; - } -} - -struct session *session_manager_runtime_lookup_session_by_id(struct session_manager_runtime *sess_mgr_rt, uint64_t sess_id) -{ - struct session *sess = NULL; - sess = session_table_find_sessid(sess_mgr_rt->tcp_sess_table, sess_id, 1); - if (sess) - { - return sess; - } - - sess = session_table_find_sessid(sess_mgr_rt->udp_sess_table, sess_id, 1); - if (sess) - { - return sess; - } - - return NULL; -} - -int session_manager_runtime_update_session(struct session_manager_runtime *sess_mgr_rt, struct session *sess, const struct packet *pkt, uint64_t now_ms) -{ - sess_mgr_rt->now_ms = now_ms; - - struct tuple6 key; - if (packet_get_innermost_tuple6(pkt, &key)) - { - return -1; - } - if (session_manager_runtime_bypass_duplicated_packet(sess_mgr_rt, sess, pkt, &key)) - { - return -1; - } - switch (session_get_type(sess)) - { - case SESSION_TYPE_TCP: - return session_manager_runtime_update_tcp_session(sess_mgr_rt, sess, pkt, &key); - case SESSION_TYPE_UDP: - return session_manager_runtime_update_udp_session(sess_mgr_rt, sess, pkt, &key); - default: - return -1; - } -} - -struct session *session_manager_runtime_get_expired_session(struct session_manager_runtime *sess_mgr_rt, uint64_t now_ms) -{ - sess_mgr_rt->now_ms = now_ms; - - struct session *sess = session_timer_expire(sess_mgr_rt->sess_timer, now_ms); - if (sess) - { - enum session_state curr_state = session_get_current_state(sess); - enum session_state next_state = session_transition_run(curr_state, TIMEOUT); - session_transition_log(sess, curr_state, next_state, TIMEOUT); - session_set_current_state(sess, next_state); - - switch (session_get_type(sess)) - { - case SESSION_TYPE_TCP: - SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, tcp); - break; - case SESSION_TYPE_UDP: - SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, udp); - break; - default: - assert(0); - break; - } - - // next state is closed, need to free session - if (next_state == SESSION_STATE_CLOSED) - { - if (!session_get_closing_reason(sess)) - { - session_set_closing_reason(sess, CLOSING_BY_TIMEOUT); - } - return sess; - } - // next state is closing, only update timeout - else - { - switch (session_get_type(sess)) - { - case SESSION_TYPE_TCP: - session_timer_update(sess_mgr_rt->sess_timer, sess, now_ms + sess_mgr_rt->cfg.tcp_timeout_ms.data); - break; - case SESSION_TYPE_UDP: - session_timer_update(sess_mgr_rt->sess_timer, sess, now_ms + sess_mgr_rt->cfg.udp_timeout_ms.data); - break; - default: - assert(0); - break; - } - return NULL; - } - } - - return NULL; -} - -struct session *session_manager_runtime_get_evicted_session(struct session_manager_runtime *sess_mgr_rt) -{ - struct session *sess = TAILQ_FIRST(&sess_mgr_rt->evicte_list); - if (sess) - { - TAILQ_REMOVE(&sess_mgr_rt->evicte_list, sess, evicte_tqe); - } - return sess; -} - -uint64_t session_manager_runtime_clean_session(struct session_manager_runtime *sess_mgr_rt, uint64_t now_ms, struct session *cleaned_sess[], uint64_t array_size) -{ - sess_mgr_rt->now_ms = now_ms; - struct session *sess = NULL; - uint64_t cleaned_sess_num = 0; - uint64_t expired_sess_num = 0; - - uint8_t expired_sess_canbe_clean = 0; - if (now_ms - sess_mgr_rt->last_clean_expired_sess_ts >= sess_mgr_rt->cfg.expire_period_ms) - { - expired_sess_canbe_clean = 1; - } - - for (uint64_t i = 0; i < array_size; i++) - { - // frist clean evicted session - sess = session_manager_runtime_get_evicted_session(sess_mgr_rt); - if (sess) - { - cleaned_sess[cleaned_sess_num++] = sess; - } - // then clean expired session - else - { - if (expired_sess_canbe_clean && expired_sess_num < sess_mgr_rt->cfg.expire_batch_max) - { - sess_mgr_rt->last_clean_expired_sess_ts = now_ms; - sess = session_manager_runtime_get_expired_session(sess_mgr_rt, now_ms); - if (sess) - { - cleaned_sess[cleaned_sess_num++] = sess; - expired_sess_num++; - } - else - { - break; - } - } - else - { - break; - } - } - } - - return cleaned_sess_num; + return mq_schema_subscribe(sess_mgr->schema->mq, sess_mgr->schema->topic_id_tcp_stream, (on_msg_cb_func *)(void *)cb, args); } /****************************************************************************** - * stat -- get / print + * session manager module ******************************************************************************/ -struct session_manager_stat *session_manager_runtime_get_stat(struct session_manager_runtime *sess_mgr_rt) +struct stellar_module *session_manager_module_on_init(struct stellar_module_manager *mod_mgr) { - return &sess_mgr_rt->stat; -} + struct stellar_module *pkt_mgr_mod = stellar_module_manager_get_module(mod_mgr, "packet_manager_module"); + struct packet_manager *pkt_mgr = stellar_module_get_ctx(pkt_mgr_mod); + struct mq_schema *mq_schema = stellar_module_manager_get_mq_schema(mod_mgr); + const char *toml_file = stellar_module_manager_get_toml_path(mod_mgr); -void session_manager_runtime_print_stat(struct session_manager_runtime *sess_mgr_rt) -{ - struct session_manager_stat *stat = &sess_mgr_rt->stat; + struct session_manager *sess_mgr = session_manager_new(pkt_mgr, mq_schema, toml_file); + if (sess_mgr == NULL) + { + return NULL; + } - // TCP session - SESSION_LOG_INFO("TCP session: history=%lu, used=%lu, opening=%lu, active=%lu, closing=%lu, discard=%lu, closed=%lu", - stat->history_tcp_sessions, stat->tcp_sess_used, stat->tcp_sess_opening, stat->tcp_sess_active, - stat->tcp_sess_closing, stat->tcp_sess_discard, stat->tcp_sess_closed); - // UDP session - SESSION_LOG_INFO("UDP session: history=%lu, used=%lu, opening=%lu, active=%lu, closing=%lu, discard=%lu, closed=%lu", - stat->history_udp_sessions, stat->udp_sess_used, stat->udp_sess_opening, stat->udp_sess_active, - stat->udp_sess_closing, stat->udp_sess_discard, stat->udp_sess_closed); - // evicted session - SESSION_LOG_INFO("evicted session: TCP=%lu, UDP=%lu", stat->tcp_sess_evicted, stat->udp_sess_evicted); - // Bypassed packet - SESSION_LOG_INFO("bypassed TCP packet: table_full=%lu, session_not_found=%lu, duplicated=%lu", - stat->tcp_pkts_bypass_table_full, stat->tcp_pkts_bypass_session_not_found, stat->tcp_pkts_bypass_duplicated); - SESSION_LOG_INFO("bypassed UDP packet: table_full=%lu, session_evicted=%lu, duplicated=%lu", - stat->udp_pkts_bypass_table_full, stat->udp_pkts_bypass_session_evicted, stat->udp_pkts_bypass_duplicated); - // TCP segment - SESSION_LOG_INFO("TCP segment: input=%lu, consumed=%lu, timeout=%lu, retransmited=%lu, overlapped=%lu, omitted_too_many=%lu, inorder=%lu, reordered=%lu, buffered=%lu, freed=%lu", - stat->tcp_segs_input, stat->tcp_segs_consumed, stat->tcp_segs_timeout, stat->tcp_segs_retransmited, - stat->tcp_segs_overlapped, stat->tcp_segs_omitted_too_many, stat->tcp_segs_inorder, stat->tcp_segs_reordered, - stat->tcp_segs_buffered, stat->tcp_segs_freed); -} + struct stellar_module *sess_mgr_mod = stellar_module_new("session_manager_module", sess_mgr); + if (sess_mgr_mod == NULL) + { + SESSION_MANAGER_LOG_ERROR("failed to create session_manager_module"); + session_manager_free(sess_mgr); + return NULL; + } + stellar_module_set_ctx(sess_mgr_mod, sess_mgr); -/****************************************************************************** - * scan - ******************************************************************************/ - -static inline uint8_t ipv4_in_range(const struct in_addr *addr, const struct in_addr *start, const struct in_addr *end) -{ - return (memcmp(addr, start, sizeof(struct in_addr)) >= 0 && memcmp(addr, end, sizeof(struct in_addr)) <= 0); + SESSION_MANAGER_LOG_INFO("session_manager_module initialized"); + return sess_mgr_mod; } -static inline uint8_t ipv6_in_range(const struct in6_addr *addr, const struct in6_addr *start, const struct in6_addr *end) -{ - return (memcmp(addr, start, sizeof(struct in6_addr)) >= 0 && memcmp(addr, end, sizeof(struct in6_addr)) <= 0); -} - -uint64_t session_manager_runtime_scan(const struct session_manager_runtime *sess_mgr_rt, const struct session_scan_opts *opts, uint64_t mached_sess_ids[], uint64_t array_size) -{ - uint64_t capacity = 0; - uint64_t max_loop = 0; - uint64_t mached_sess_num = 0; - const struct session *sess = NULL; - const struct tuple6 *tuple = NULL; - - if (sess_mgr_rt == NULL || opts == NULL || mached_sess_ids == NULL || array_size == 0) - { - return mached_sess_num; - } - if (opts->count == 0) - { - return mached_sess_num; - } - capacity = session_pool_capacity_size(sess_mgr_rt->sess_pool); - if (opts->cursor >= capacity) - { - return mached_sess_num; - } - - max_loop = MIN(capacity, opts->cursor + opts->count); - for (uint64_t i = opts->cursor; i < max_loop; i++) - { - sess = session_pool_get0(sess_mgr_rt->sess_pool, i); - tuple = session_get_tuple6(sess); - if (session_get_current_state(sess) == SESSION_STATE_INIT) - { - continue; - } - - if ((opts->flags & SESSION_SCAN_TYPE) && opts->type != session_get_type(sess)) - { - continue; - } - if ((opts->flags & SESSION_SCAN_STATE) && opts->state != session_get_current_state(sess)) - { - continue; - } - if ((opts->flags & SESSION_SCAN_CREATE_TIME) && - (session_get_timestamp(sess, SESSION_TIMESTAMP_START) < opts->create_time_ms[0] || - session_get_timestamp(sess, SESSION_TIMESTAMP_START) > opts->create_time_ms[1])) - { - continue; - } - if ((opts->flags & SESSION_SCAN_LAST_PKT_TIME) && - (session_get_timestamp(sess, SESSION_TIMESTAMP_LAST) < opts->last_pkt_time_ms[0] || - session_get_timestamp(sess, SESSION_TIMESTAMP_LAST) > opts->last_pkt_time_ms[1])) - { - continue; - } - if ((opts->flags & SESSION_SCAN_SPORT) && opts->src_port != tuple->src_port) - { - continue; - } - if ((opts->flags & SESSION_SCAN_DPORT) && opts->dst_port != tuple->dst_port) - { - continue; - } - if (opts->flags & SESSION_SCAN_SIP) - { - if (opts->addr_family != tuple->addr_family) - { - continue; - } - if ((opts->addr_family == AF_INET) && !ipv4_in_range(&tuple->src_addr.v4, &opts->src_addr[0].v4, &opts->src_addr[1].v4)) - { - continue; - } - if ((opts->addr_family == AF_INET6) && !ipv6_in_range(&tuple->src_addr.v6, &opts->src_addr[0].v6, &opts->src_addr[1].v6)) - { - continue; - } - } - if (opts->flags & SESSION_SCAN_DIP) - { - if (opts->addr_family != tuple->addr_family) - { - continue; - } - if ((opts->addr_family == AF_INET) && !ipv4_in_range(&tuple->dst_addr.v4, &opts->dst_addr[0].v4, &opts->dst_addr[1].v4)) - { - continue; - } - if ((opts->addr_family == AF_INET6) && !ipv6_in_range(&tuple->dst_addr.v6, &opts->dst_addr[0].v6, &opts->dst_addr[1].v6)) - { - continue; - } - } - - mached_sess_ids[mached_sess_num++] = session_get_id(sess); - if (mached_sess_num >= array_size) - { - break; - } - } - - SESSION_LOG_DEBUG("session scan: cursor=%lu, count=%lu, mached_sess_num=%lu", opts->cursor, opts->count, mached_sess_num); - return mached_sess_num; -} - -/****************************************************************************** - * other - ******************************************************************************/ - -void session_set_discard(struct session *sess) +void session_manager_module_on_exit(struct stellar_module_manager *mod_mgr, struct stellar_module *mod) { - struct session_manager_runtime *sess_mgr_rt = sess->sess_mgr_rt; - enum session_type type = session_get_type(sess); - enum session_state curr_state = session_get_current_state(sess); - enum session_state next_state = session_transition_run(curr_state, USER_CLOSE); - session_transition_log(sess, curr_state, next_state, USER_CLOSE); - session_set_current_state(sess, next_state); + if (mod) + { + struct session_manager *sess_mgr = stellar_module_get_ctx(mod); - switch (type) - { - case SESSION_TYPE_TCP: - session_timer_update(sess_mgr_rt->sess_timer, sess, sess_mgr_rt->now_ms + sess_mgr_rt->cfg.tcp_timeout_ms.discard_default); - SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, tcp); - break; - case SESSION_TYPE_UDP: - session_timer_update(sess_mgr_rt->sess_timer, sess, sess_mgr_rt->now_ms + sess_mgr_rt->cfg.udp_timeout_ms.discard_default); - SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, udp); - break; - default: - assert(0); - break; - } + session_manager_free(sess_mgr); + stellar_module_free(mod); + SESSION_MANAGER_LOG_ERROR("session_manager_module exited"); + } }
\ No newline at end of file diff --git a/infra/session_manager/session_manager_runtime.c b/infra/session_manager/session_manager_runtime.c new file mode 100644 index 0000000..f7786a1 --- /dev/null +++ b/infra/session_manager/session_manager_runtime.c @@ -0,0 +1,1462 @@ +#include <time.h> +#include <stdlib.h> +#include <assert.h> +#include <errno.h> + +#include "utils.h" +#include "packet_helper.h" +#include "packet_filter.h" +#include "session_internal.h" +#include "session_pool.h" +#include "session_table.h" +#include "session_timer.h" +#include "session_filter.h" +#include "session_transition.h" +#include "session_manager_runtime.h" + +#define SESSION_MANAGER_RUNTIME_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "session manager runtime", format, ##__VA_ARGS__) +#define SESSION_MANAGER_RUNTIME_LOG_DEBUG(format, ...) STELLAR_LOG_DEBUG(__thread_local_logger, "session manager runtime", format, ##__VA_ARGS__) +#define SESSION_MANAGER_RUNTIME_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "session manager runtime", format, ##__VA_ARGS__) + +struct snowflake +{ + uint64_t seed; + uint64_t sequence; +}; + +struct session_manager_runtime +{ + struct session_list evicte_list; + struct session_pool *sess_pool; + struct session_timer *sess_timer; + struct session_table *tcp_sess_table; + struct session_table *udp_sess_table; + + struct packet_filter *dup_pkt_filter; + struct session_filter *evicte_sess_filter; + + struct session_manager_stat stat; + struct session_manager_config cfg; + + /* + * only used for session_set_discard() or session_manager_runtime_record_duplicated_packet(), + * because the function is called by plugin and has no time input. + */ + uint64_t now_ms; + uint64_t last_clean_expired_sess_ts; + struct snowflake *sf; +}; + +#define EVICTE_SESSION_BURST (RX_BURST_MAX) + +/****************************************************************************** + * session manager stat macro + ******************************************************************************/ + +#define SESS_MGR_STAT_INC(stat, state, proto) \ + { \ + switch ((state)) \ + { \ + case SESSION_STATE_OPENING: \ + (stat)->proto##_sess_opening++; \ + break; \ + case SESSION_STATE_ACTIVE: \ + (stat)->proto##_sess_active++; \ + break; \ + case SESSION_STATE_CLOSING: \ + (stat)->proto##_sess_closing++; \ + break; \ + case SESSION_STATE_DISCARD: \ + (stat)->proto##_sess_discard++; \ + break; \ + case SESSION_STATE_CLOSED: \ + (stat)->proto##_sess_closed++; \ + break; \ + default: \ + break; \ + } \ + } + +#define SESS_MGR_STAT_DEC(stat, state, proto) \ + { \ + switch ((state)) \ + { \ + case SESSION_STATE_OPENING: \ + (stat)->proto##_sess_opening--; \ + break; \ + case SESSION_STATE_ACTIVE: \ + (stat)->proto##_sess_active--; \ + break; \ + case SESSION_STATE_CLOSING: \ + (stat)->proto##_sess_closing--; \ + break; \ + case SESSION_STATE_DISCARD: \ + (stat)->proto##_sess_discard--; \ + break; \ + case SESSION_STATE_CLOSED: \ + (stat)->proto##_sess_closed--; \ + break; \ + default: \ + break; \ + } \ + } + +#define SESS_MGR_STAT_UPDATE(stat, curr, next, proto) \ + { \ + if (curr != next) \ + { \ + SESS_MGR_STAT_DEC(stat, curr, proto); \ + SESS_MGR_STAT_INC(stat, next, proto); \ + } \ + } + +/****************************************************************************** + * snowflake + ******************************************************************************/ + +static struct snowflake *snowflake_new(uint64_t seed) +{ + struct snowflake *sf = (struct snowflake *)calloc(1, sizeof(struct snowflake)); + if (sf == NULL) + { + return NULL; + } + + sf->seed = seed & 0xFFFFF; + sf->sequence = 0; + + return sf; +} + +static void snowflake_free(struct snowflake *sf) +{ + if (sf != NULL) + { + free(sf); + sf = NULL; + } +} + +/* + * high -> low + * + * +------+------------------+----------------+------------------------+---------------------------+ + * | 1bit | 12bit device_id | 8bit thread_id | 28bit timestamp in sec | 15bit sequence per thread | + * +------+------------------+----------------+------------------------+---------------------------+ + */ + +#define MAX_ID_PER_THREAD (32768) +#define MAX_ID_BASE_TIME (268435456L) + +static uint64_t snowflake_generate(struct snowflake *sf, uint64_t now_sec) +{ + uint64_t id = 0; + uint64_t id_per_thread = (sf->sequence++) % MAX_ID_PER_THREAD; + uint64_t id_base_time = now_sec % MAX_ID_BASE_TIME; + + id = (sf->seed << 43) | (id_base_time << 15) | (id_per_thread); + + return id; +} + +/****************************************************************************** + * TCP utils + ******************************************************************************/ + +static void tcp_clean(struct session_manager_runtime *sess_mgr_rt, struct session *sess) +{ + struct tcp_reassembly *c2s_ssembler = sess->tcp_halfs[FLOW_TYPE_C2S].assembler; + struct tcp_reassembly *s2c_ssembler = sess->tcp_halfs[FLOW_TYPE_S2C].assembler; + struct tcp_segment *seg; + if (c2s_ssembler) + { + while ((seg = tcp_reassembly_expire(c2s_ssembler, UINT64_MAX))) + { + session_inc_stat(sess, FLOW_TYPE_C2S, STAT_TCP_SEGMENTS_RELEASED, 1); + session_inc_stat(sess, FLOW_TYPE_C2S, STAT_TCP_PAYLOADS_RELEASED, seg->len); + sess_mgr_rt->stat.tcp_segs_freed++; + tcp_segment_free(seg); + } + tcp_reassembly_free(c2s_ssembler); + } + if (s2c_ssembler) + { + while ((seg = tcp_reassembly_expire(s2c_ssembler, UINT64_MAX))) + { + session_inc_stat(sess, FLOW_TYPE_S2C, STAT_TCP_SEGMENTS_RELEASED, 1); + session_inc_stat(sess, FLOW_TYPE_S2C, STAT_TCP_PAYLOADS_RELEASED, seg->len); + sess_mgr_rt->stat.tcp_segs_freed++; + tcp_segment_free(seg); + } + tcp_reassembly_free(s2c_ssembler); + } +} + +static int tcp_init(struct session_manager_runtime *sess_mgr_rt, struct session *sess) +{ + if (!sess_mgr_rt->cfg.tcp_reassembly.enable) + { + return 0; + } + + sess->tcp_halfs[FLOW_TYPE_C2S].assembler = tcp_reassembly_new(sess_mgr_rt->cfg.tcp_reassembly.timeout_ms, sess_mgr_rt->cfg.tcp_reassembly.buffered_segments_max); + sess->tcp_halfs[FLOW_TYPE_S2C].assembler = tcp_reassembly_new(sess_mgr_rt->cfg.tcp_reassembly.timeout_ms, sess_mgr_rt->cfg.tcp_reassembly.buffered_segments_max); + if (sess->tcp_halfs[FLOW_TYPE_C2S].assembler == NULL || sess->tcp_halfs[FLOW_TYPE_S2C].assembler == NULL) + { + tcp_clean(sess_mgr_rt, sess); + return -1; + } + + SESSION_MANAGER_RUNTIME_LOG_DEBUG("session %lu %s new c2s tcp assembler %p, s2c tcp assembler %p", + session_get_id(sess), session_get0_readable_addr(sess), + sess->tcp_halfs[FLOW_TYPE_C2S].assembler, + sess->tcp_halfs[FLOW_TYPE_S2C].assembler); + + return 0; +} + +static void tcp_update(struct session_manager_runtime *sess_mgr_rt, struct session *sess, enum flow_type type, const struct layer_private *tcp_layer) +{ + struct tcp_segment *seg; + struct tcphdr *hdr = (struct tcphdr *)tcp_layer->hdr_ptr; + struct tcp_half *half = &sess->tcp_halfs[type]; + uint8_t flags = tcp_hdr_get_flags(hdr); + uint16_t len = tcp_layer->pld_len; + + if ((flags & TH_SYN) && half->isn == 0) + { + half->isn = tcp_hdr_get_seq(hdr); + } + half->flags = flags; + half->history |= flags; + half->seq = tcp_hdr_get_seq(hdr); + half->ack = tcp_hdr_get_ack(hdr); + half->len = tcp_layer->pld_len; + + if (!sess_mgr_rt->cfg.tcp_reassembly.enable) + { + if (len) + { + session_inc_stat(sess, type, STAT_TCP_SEGMENTS_RECEIVED, 1); + session_inc_stat(sess, type, STAT_TCP_PAYLOADS_RECEIVED, len); + sess_mgr_rt->stat.tcp_segs_input++; + + session_inc_stat(sess, type, STAT_TCP_SEGMENTS_INORDER, 1); + session_inc_stat(sess, type, STAT_TCP_PAYLOADS_INORDER, len); + sess_mgr_rt->stat.tcp_segs_inorder++; + + half->in_order.data = tcp_layer->pld_ptr; + half->in_order.len = len; + half->in_order_ref = 0; + } + return; + } + + if (unlikely(flags & TH_SYN)) + { + // len > 0 is SYN with data (TCP Fast Open) + tcp_reassembly_set_recv_next(half->assembler, len ? half->seq : half->seq + 1); + } + + seg = tcp_reassembly_expire(half->assembler, sess_mgr_rt->now_ms); + if (seg) + { + session_inc_stat(sess, type, STAT_TCP_SEGMENTS_EXPIRED, 1); + session_inc_stat(sess, type, STAT_TCP_PAYLOADS_EXPIRED, seg->len); + sess_mgr_rt->stat.tcp_segs_timeout++; + + session_inc_stat(sess, type, STAT_TCP_SEGMENTS_RELEASED, 1); + session_inc_stat(sess, type, STAT_TCP_PAYLOADS_RELEASED, seg->len); + sess_mgr_rt->stat.tcp_segs_freed++; + + tcp_segment_free(seg); + } + + if (len) + { + session_inc_stat(sess, type, STAT_TCP_SEGMENTS_RECEIVED, 1); + session_inc_stat(sess, type, STAT_TCP_PAYLOADS_RECEIVED, len); + sess_mgr_rt->stat.tcp_segs_input++; + + uint32_t rcv_nxt = tcp_reassembly_get_recv_next(half->assembler); + // in order + if (half->seq == rcv_nxt) + { + session_inc_stat(sess, type, STAT_TCP_SEGMENTS_INORDER, 1); + session_inc_stat(sess, type, STAT_TCP_PAYLOADS_INORDER, len); + sess_mgr_rt->stat.tcp_segs_inorder++; + + half->in_order.data = tcp_layer->pld_ptr; + half->in_order.len = len; + half->in_order_ref = 0; + tcp_reassembly_inc_recv_next(half->assembler, len); + } + // retransmission + else if (uint32_before(uint32_add(half->seq, len), rcv_nxt)) + { + session_inc_stat(sess, type, STAT_TCP_SEGMENTS_RETRANSMIT, 1); + session_inc_stat(sess, type, STAT_TCP_PAYLOADS_RETRANSMIT, len); + sess_mgr_rt->stat.tcp_segs_retransmited++; + } + else if ((seg = tcp_segment_new(half->seq, tcp_layer->pld_ptr, len))) + { + switch (tcp_reassembly_push(half->assembler, seg, sess_mgr_rt->now_ms)) + { + case -2: + session_inc_stat(sess, type, STAT_TCP_SEGMENTS_RETRANSMIT, 1); + session_inc_stat(sess, type, STAT_TCP_PAYLOADS_RETRANSMIT, len); + sess_mgr_rt->stat.tcp_segs_retransmited++; + tcp_segment_free(seg); + break; + case -1: + session_inc_stat(sess, type, STAT_TCP_SEGMENTS_NOSPACE, 1); + session_inc_stat(sess, type, STAT_TCP_PAYLOADS_NOSPACE, len); + sess_mgr_rt->stat.tcp_segs_omitted_too_many++; + tcp_segment_free(seg); + break; + case 0: + session_inc_stat(sess, type, STAT_TCP_SEGMENTS_BUFFERED, 1); + session_inc_stat(sess, type, STAT_TCP_PAYLOADS_BUFFERED, len); + sess_mgr_rt->stat.tcp_segs_buffered++; + break; + case 1: + session_inc_stat(sess, type, STAT_TCP_SEGMENTS_OVERLAP, 1); + session_inc_stat(sess, type, STAT_TCP_PAYLOADS_OVERLAP, len); + sess_mgr_rt->stat.tcp_segs_overlapped++; + + session_inc_stat(sess, type, STAT_TCP_SEGMENTS_BUFFERED, 1); + session_inc_stat(sess, type, STAT_TCP_PAYLOADS_BUFFERED, len); + sess_mgr_rt->stat.tcp_segs_buffered++; + break; + default: + assert(0); + break; + } + } + else + { + session_inc_stat(sess, type, STAT_TCP_SEGMENTS_NOSPACE, 1); + session_inc_stat(sess, type, STAT_TCP_PAYLOADS_NOSPACE, len); + sess_mgr_rt->stat.tcp_segs_omitted_too_many++; + } + } +} + +/****************************************************************************** + * session flow + ******************************************************************************/ + +static enum flow_type identify_flow_type_by_port(uint16_t src_port, uint16_t dst_port) +{ + // big port is client + if (src_port > dst_port) + { + return FLOW_TYPE_C2S; + } + else if (src_port < dst_port) + { + return FLOW_TYPE_S2C; + } + else + { + // if port is equal, first packet is C2S + return FLOW_TYPE_C2S; + } +} + +static enum flow_type identify_flow_type_by_history(const struct session *sess, const struct tuple6 *key) +{ + if (tuple6_cmp(session_get_tuple6(sess), key) == 0) + { + return FLOW_TYPE_C2S; + } + else + { + return FLOW_TYPE_S2C; + } +} + +/****************************************************************************** + * bypass packet -- table limit / session evicted / duplicated packet + ******************************************************************************/ + +static int session_manager_runtime_bypass_packet_on_tcp_table_limit(struct session_manager_runtime *sess_mgr_rt, const struct tuple6 *key) +{ + if (key->ip_proto == IPPROTO_TCP && sess_mgr_rt->stat.tcp_sess_used >= sess_mgr_rt->cfg.tcp_session_max) + { + sess_mgr_rt->stat.tcp_pkts_bypass_table_full++; + return 1; + } + return 0; +} + +static int session_manager_runtime_bypass_packet_on_udp_table_limit(struct session_manager_runtime *sess_mgr_rt, const struct tuple6 *key) +{ + if (key->ip_proto == IPPROTO_UDP && sess_mgr_rt->stat.udp_sess_used >= sess_mgr_rt->cfg.udp_session_max) + { + sess_mgr_rt->stat.udp_pkts_bypass_table_full++; + return 1; + } + return 0; +} + +static int session_manager_runtime_bypass_packet_on_session_evicted(struct session_manager_runtime *sess_mgr_rt, const struct tuple6 *key) +{ + if (sess_mgr_rt->cfg.evicted_session_bloom_filter.enable && session_filter_lookup(sess_mgr_rt->evicte_sess_filter, key, sess_mgr_rt->now_ms)) + { + sess_mgr_rt->stat.udp_pkts_bypass_session_evicted++; + return 1; + } + + return 0; +} + +static int session_manager_runtime_bypass_duplicated_packet(struct session_manager_runtime *sess_mgr_rt, struct session *sess, const struct packet *pkt, const struct tuple6 *key) +{ + if (sess_mgr_rt->cfg.duplicated_packet_bloom_filter.enable == 0) + { + return 0; + } + + enum flow_type type = identify_flow_type_by_history(sess, key); + if (session_get_stat(sess, type, STAT_RAW_PACKETS_RECEIVED) < 3 || session_has_duplicate_traffic(sess)) + { + if (packet_filter_lookup(sess_mgr_rt->dup_pkt_filter, pkt, sess_mgr_rt->now_ms)) + { + session_inc_stat(sess, type, STAT_DUPLICATE_PACKETS_BYPASS, 1); + session_inc_stat(sess, type, STAT_DUPLICATE_BYTES_BYPASS, packet_get_raw_len(pkt)); + switch (session_get_type(sess)) + { + case SESSION_TYPE_TCP: + sess_mgr_rt->stat.tcp_pkts_bypass_duplicated++; + break; + case SESSION_TYPE_UDP: + sess_mgr_rt->stat.udp_pkts_bypass_duplicated++; + break; + default: + assert(0); + break; + } + session_set_duplicate_traffic(sess); + + session_set_current_packet(sess, pkt); + session_set_flow_type(sess, type); + return 1; + } + else + { + packet_filter_add(sess_mgr_rt->dup_pkt_filter, pkt, sess_mgr_rt->now_ms); + return 0; + } + } + + return 0; +} + +void session_manager_runtime_record_duplicated_packet(struct session_manager_runtime *sess_mgr_rt, const struct packet *pkt) +{ + if (sess_mgr_rt->cfg.duplicated_packet_bloom_filter.enable) + { + packet_filter_add(sess_mgr_rt->dup_pkt_filter, pkt, sess_mgr_rt->now_ms); + } +} + +/****************************************************************************** + * config -- new / free / print + ******************************************************************************/ + +struct session_manager_config *session_manager_config_new(const char *toml_file) +{ + if (toml_file == NULL) + { + return NULL; + } + + struct session_manager_config *sess_mgr_cfg = (struct session_manager_config *)calloc(1, sizeof(struct session_manager_config)); + if (sess_mgr_cfg == NULL) + { + return NULL; + } + + int ret = 0; + ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_session_max", (uint64_t *)&sess_mgr_cfg->tcp_session_max, EVICTE_SESSION_BURST * 2, UINT64_MAX); + ret += load_and_validate_toml_integer_config(toml_file, "session_manager.udp_session_max", (uint64_t *)&sess_mgr_cfg->udp_session_max, EVICTE_SESSION_BURST * 2, UINT64_MAX); + + ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evict_old_on_tcp_table_limit", (uint64_t *)&sess_mgr_cfg->evict_old_on_tcp_table_limit, 0, 1); + ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evict_old_on_udp_table_limit", (uint64_t *)&sess_mgr_cfg->evict_old_on_udp_table_limit, 0, 1); + + ret += load_and_validate_toml_integer_config(toml_file, "session_manager.expire_period_ms", (uint64_t *)&sess_mgr_cfg->expire_period_ms, 0, 60000); + ret += load_and_validate_toml_integer_config(toml_file, "session_manager.expire_batch_max", (uint64_t *)&sess_mgr_cfg->expire_batch_max, 1, 1024); + + ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.init", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.init, 1, 60000); + ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.handshake", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.handshake, 1, 60000); + ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.data", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.data, 1, 15999999000); + ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.half_closed", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.half_closed, 1, 604800000); + ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.time_wait", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.time_wait, 1, 60000); + ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.discard_default", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.discard_default, 1, 15999999000); + ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.unverified_rst", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.unverified_rst, 1, 60000); + + ret += load_and_validate_toml_integer_config(toml_file, "session_manager.udp_timeout_ms.data", (uint64_t *)&sess_mgr_cfg->udp_timeout_ms.data, 1, 15999999000); + ret += load_and_validate_toml_integer_config(toml_file, "session_manager.udp_timeout_ms.discard_default", (uint64_t *)&sess_mgr_cfg->udp_timeout_ms.discard_default, 1, 15999999000); + + ret += load_and_validate_toml_integer_config(toml_file, "session_manager.duplicated_packet_bloom_filter.enable", (uint64_t *)&sess_mgr_cfg->duplicated_packet_bloom_filter.enable, 0, 1); + ret += load_and_validate_toml_integer_config(toml_file, "session_manager.duplicated_packet_bloom_filter.capacity", (uint64_t *)&sess_mgr_cfg->duplicated_packet_bloom_filter.capacity, 1, 4294967295); + ret += load_and_validate_toml_integer_config(toml_file, "session_manager.duplicated_packet_bloom_filter.time_window_ms", (uint64_t *)&sess_mgr_cfg->duplicated_packet_bloom_filter.time_window_ms, 1, 60000); + ret += load_and_validate_toml_double_config(toml_file, "session_manager.duplicated_packet_bloom_filter.error_rate", (double *)&sess_mgr_cfg->duplicated_packet_bloom_filter.error_rate, 0.0, 1.0); + + ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evicted_session_bloom_filter.enable", (uint64_t *)&sess_mgr_cfg->evicted_session_bloom_filter.enable, 0, 1); + ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evicted_session_bloom_filter.capacity", (uint64_t *)&sess_mgr_cfg->evicted_session_bloom_filter.capacity, 1, 4294967295); + ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evicted_session_bloom_filter.time_window_ms", (uint64_t *)&sess_mgr_cfg->evicted_session_bloom_filter.time_window_ms, 1, 60000); + ret += load_and_validate_toml_double_config(toml_file, "session_manager.evicted_session_bloom_filter.error_rate", (double *)&sess_mgr_cfg->evicted_session_bloom_filter.error_rate, 0.0, 1.0); + + ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_reassembly.enable", (uint64_t *)&sess_mgr_cfg->tcp_reassembly.enable, 0, 1); + ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_reassembly.timeout_ms", (uint64_t *)&sess_mgr_cfg->tcp_reassembly.timeout_ms, 1, 60000); + ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_reassembly.buffered_segments_max", (uint64_t *)&sess_mgr_cfg->tcp_reassembly.buffered_segments_max, 1, 512); + + if (ret != 0) + { + session_manager_config_free(sess_mgr_cfg); + return NULL; + } + + return sess_mgr_cfg; +} + +void session_manager_config_free(struct session_manager_config *sess_mgr_cfg) +{ + if (sess_mgr_cfg) + { + free(sess_mgr_cfg); + sess_mgr_cfg = NULL; + } +} + +void session_manager_config_print(struct session_manager_config *sess_mgr_cfg) +{ + if (sess_mgr_cfg) + { + // max session number + SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_session_max : %lu", sess_mgr_cfg->tcp_session_max); + SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.udp_session_max : %lu", sess_mgr_cfg->udp_session_max); + + // session overload + SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.evict_old_on_tcp_table_limit : %d", sess_mgr_cfg->evict_old_on_tcp_table_limit); + SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.evict_old_on_udp_table_limit : %d", sess_mgr_cfg->evict_old_on_udp_table_limit); + + // TCP timeout + SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_timeout_ms.init : %lu", sess_mgr_cfg->tcp_timeout_ms.init); + SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_timeout_ms.handshake : %lu", sess_mgr_cfg->tcp_timeout_ms.handshake); + SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_timeout_ms.data : %lu", sess_mgr_cfg->tcp_timeout_ms.data); + SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_timeout_ms.half_closed : %lu", sess_mgr_cfg->tcp_timeout_ms.half_closed); + SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_timeout_ms.time_wait : %lu", sess_mgr_cfg->tcp_timeout_ms.time_wait); + SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_timeout_ms.discard_default : %lu", sess_mgr_cfg->tcp_timeout_ms.discard_default); + SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_timeout_ms.unverified_rst : %lu", sess_mgr_cfg->tcp_timeout_ms.unverified_rst); + + // UDP timeout + SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.udp_timeout_ms.data : %lu", sess_mgr_cfg->udp_timeout_ms.data); + SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.udp_timeout_ms.discard_default : %lu", sess_mgr_cfg->udp_timeout_ms.discard_default); + + // limit + SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.expire_period_ms : %lu", sess_mgr_cfg->expire_period_ms); + SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.expire_batch_max : %lu", sess_mgr_cfg->expire_batch_max); + + // duplicated packet filter + SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.duplicated_packet_bloom_filter.enable : %d", sess_mgr_cfg->duplicated_packet_bloom_filter.enable); + SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.duplicated_packet_bloom_filter.capacity : %lu", sess_mgr_cfg->duplicated_packet_bloom_filter.capacity); + SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.duplicated_packet_bloom_filter.time_window_ms : %lu", sess_mgr_cfg->duplicated_packet_bloom_filter.time_window_ms); + SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.duplicated_packet_bloom_filter.error_rate : %f", sess_mgr_cfg->duplicated_packet_bloom_filter.error_rate); + + // eviction session filter + SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.evicted_session_bloom_filter.enable : %d", sess_mgr_cfg->evicted_session_bloom_filter.enable); + SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.evicted_session_bloom_filter.capacity : %lu", sess_mgr_cfg->evicted_session_bloom_filter.capacity); + SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.evicted_session_bloom_filter.time_window_ms : %lu", sess_mgr_cfg->evicted_session_bloom_filter.time_window_ms); + SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.evicted_session_bloom_filter.error_rate : %f", sess_mgr_cfg->evicted_session_bloom_filter.error_rate); + + // TCP reassembly + SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_reassembly.enable : %d", sess_mgr_cfg->tcp_reassembly.enable); + SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_reassembly.timeout_ms : %lu", sess_mgr_cfg->tcp_reassembly.timeout_ms); + SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_reassembly.buffered_segments_max : %lu", sess_mgr_cfg->tcp_reassembly.buffered_segments_max); + } +} + +/****************************************************************************** + * runtime -- new / free + ******************************************************************************/ + +struct session_manager_runtime *session_manager_runtime_new(const struct session_manager_config *sess_mgr_cfg, uint64_t now_ms) +{ + struct session_manager_runtime *sess_mgr_rt = (struct session_manager_runtime *)calloc(1, sizeof(struct session_manager_runtime)); + if (sess_mgr_rt == NULL) + { + return NULL; + } + memcpy(&sess_mgr_rt->cfg, sess_mgr_cfg, sizeof(struct session_manager_config)); + + sess_mgr_rt->sess_pool = session_pool_new(sess_mgr_rt->cfg.tcp_session_max + sess_mgr_rt->cfg.udp_session_max); + sess_mgr_rt->tcp_sess_table = session_table_new(); + sess_mgr_rt->udp_sess_table = session_table_new(); + sess_mgr_rt->sess_timer = session_timer_new(now_ms); + if (sess_mgr_rt->sess_pool == NULL || sess_mgr_rt->tcp_sess_table == NULL || sess_mgr_rt->udp_sess_table == NULL || sess_mgr_rt->sess_timer == NULL) + { + goto error; + } + if (sess_mgr_rt->cfg.evicted_session_bloom_filter.enable) + { + sess_mgr_rt->evicte_sess_filter = session_filter_new(sess_mgr_rt->cfg.evicted_session_bloom_filter.capacity, + sess_mgr_rt->cfg.evicted_session_bloom_filter.time_window_ms, + sess_mgr_rt->cfg.evicted_session_bloom_filter.error_rate, now_ms); + if (sess_mgr_rt->evicte_sess_filter == NULL) + { + goto error; + } + } + if (sess_mgr_rt->cfg.duplicated_packet_bloom_filter.enable) + { + sess_mgr_rt->dup_pkt_filter = packet_filter_new(sess_mgr_rt->cfg.duplicated_packet_bloom_filter.capacity, + sess_mgr_rt->cfg.duplicated_packet_bloom_filter.time_window_ms, + sess_mgr_rt->cfg.duplicated_packet_bloom_filter.error_rate, now_ms); + if (sess_mgr_rt->dup_pkt_filter == NULL) + { + goto error; + } + } + sess_mgr_rt->sf = snowflake_new(sess_mgr_rt->cfg.session_id_seed); + if (sess_mgr_rt->sf == NULL) + { + goto error; + } + + TAILQ_INIT(&sess_mgr_rt->evicte_list); + session_transition_init(); + sess_mgr_rt->now_ms = now_ms; + sess_mgr_rt->last_clean_expired_sess_ts = now_ms; + + return sess_mgr_rt; + +error: + session_manager_runtime_free(sess_mgr_rt); + return NULL; +} + +void session_manager_runtime_free(struct session_manager_runtime *sess_mgr_rt) +{ + struct session *sess; + if (sess_mgr_rt) + { + // free all evicted session + while ((sess = TAILQ_FIRST(&sess_mgr_rt->evicte_list))) + { + TAILQ_REMOVE(&sess_mgr_rt->evicte_list, sess, evicte_tqe); + session_manager_runtime_free_session(sess_mgr_rt, sess); + } + // free all udp session + while (sess_mgr_rt->udp_sess_table && (sess = session_table_find_lru(sess_mgr_rt->udp_sess_table))) + { + session_manager_runtime_free_session(sess_mgr_rt, sess); + } + // free all tcp session + while (sess_mgr_rt->tcp_sess_table && (sess = session_table_find_lru(sess_mgr_rt->tcp_sess_table))) + { + session_manager_runtime_free_session(sess_mgr_rt, sess); + } + if (sess_mgr_rt->cfg.evicted_session_bloom_filter.enable) + { + session_filter_free(sess_mgr_rt->evicte_sess_filter); + } + if (sess_mgr_rt->cfg.duplicated_packet_bloom_filter.enable) + { + packet_filter_free(sess_mgr_rt->dup_pkt_filter); + } + snowflake_free(sess_mgr_rt->sf); + session_timer_free(sess_mgr_rt->sess_timer); + session_table_free(sess_mgr_rt->udp_sess_table); + session_table_free(sess_mgr_rt->tcp_sess_table); + session_pool_free(sess_mgr_rt->sess_pool); + free(sess_mgr_rt); + sess_mgr_rt = NULL; + } +} + +/****************************************************************************** + * session -- new / free / lookup / updata / expire / evicte / clean + ******************************************************************************/ + +static void session_update(struct session_manager_runtime *sess_mgr_rt, struct session *sess, enum session_state next_state, const struct packet *pkt, const struct tuple6 *key, enum flow_type type) +{ + if (session_get_current_state(sess) == SESSION_STATE_INIT) + { + uint64_t sess_id = snowflake_generate(sess_mgr_rt->sf, sess_mgr_rt->now_ms / 1000); + session_set_id(sess, sess_id); + enum packet_direction pkt_dir = packet_get_direction(pkt); + if (type == FLOW_TYPE_C2S) + { + session_set_tuple6(sess, key); + if (pkt_dir == PACKET_DIRECTION_OUTGOING) // Internal -> External + { + session_set_direction(sess, SESSION_DIRECTION_OUTBOUND); + } + else + { + session_set_direction(sess, SESSION_DIRECTION_INBOUND); + } + tuple6_to_str(key, sess->tuple_str, sizeof(sess->tuple_str)); + } + else + { + struct tuple6 out; + tuple6_reverse(key, &out); + session_set_tuple6(sess, &out); + if (pkt_dir == PACKET_DIRECTION_OUTGOING) // Internal -> External + { + session_set_direction(sess, SESSION_DIRECTION_INBOUND); + } + else + { + session_set_direction(sess, SESSION_DIRECTION_OUTBOUND); + } + tuple6_to_str(&out, sess->tuple_str, sizeof(sess->tuple_str)); + } + + session_set_timestamp(sess, SESSION_TIMESTAMP_START, sess_mgr_rt->now_ms); + switch (key->ip_proto) + { + case IPPROTO_TCP: + session_set_type(sess, SESSION_TYPE_TCP); + break; + case IPPROTO_UDP: + session_set_type(sess, SESSION_TYPE_UDP); + break; + default: + assert(0); + break; + } + } + + session_inc_stat(sess, type, STAT_RAW_PACKETS_RECEIVED, 1); + session_inc_stat(sess, type, STAT_RAW_BYTES_RECEIVED, packet_get_raw_len(pkt)); + + if (!session_get_first_packet(sess, type)) + { + session_set_first_packet(sess, type, packet_dup(pkt)); + session_set_route_ctx(sess, type, packet_get_route_ctx(pkt)); + session_set_sids(sess, type, packet_get_sids(pkt)); + } + + session_set_current_packet(sess, pkt); + session_set_flow_type(sess, type); + session_set_timestamp(sess, SESSION_TIMESTAMP_LAST, sess_mgr_rt->now_ms); + session_set_current_state(sess, next_state); +} + +static void session_manager_runtime_evicte_session(struct session_manager_runtime *sess_mgr_rt, struct session *sess, int reason) +{ + if (sess == NULL) + { + return; + } + + // when session add to evicted queue, session lifetime is over + enum session_state curr_state = session_get_current_state(sess); + enum session_state next_state = session_transition_run(curr_state, reason); + session_transition_log(sess, curr_state, next_state, reason); + session_set_current_state(sess, next_state); + if (!session_get_closing_reason(sess)) + { + if (reason == PORT_REUSE_EVICT) + { + session_set_closing_reason(sess, CLOSING_BY_PORT_REUSE_EVICTED); + } + if (reason == LRU_EVICT) + { + session_set_closing_reason(sess, CLOSING_BY_LRU_EVICTED); + } + } + session_timer_del(sess_mgr_rt->sess_timer, sess); + TAILQ_INSERT_TAIL(&sess_mgr_rt->evicte_list, sess, evicte_tqe); + + switch (session_get_type(sess)) + { + case SESSION_TYPE_TCP: + SESSION_MANAGER_RUNTIME_LOG_DEBUG("evicte tcp old session: %lu", session_get_id(sess)); + session_table_del(sess_mgr_rt->tcp_sess_table, sess); + SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, tcp); + sess_mgr_rt->stat.tcp_sess_evicted++; + break; + case SESSION_TYPE_UDP: + SESSION_MANAGER_RUNTIME_LOG_DEBUG("evicte udp old session: %lu", session_get_id(sess)); + session_table_del(sess_mgr_rt->udp_sess_table, sess); + if (sess_mgr_rt->cfg.evicted_session_bloom_filter.enable) + { + session_filter_add(sess_mgr_rt->evicte_sess_filter, session_get_tuple6(sess), sess_mgr_rt->now_ms); + } + SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, udp); + sess_mgr_rt->stat.udp_sess_evicted++; + break; + default: + assert(0); + break; + } +} + +static struct session *session_manager_runtime_lookup_tcp_session(struct session_manager_runtime *sess_mgr_rt, const struct packet *pkt, const struct tuple6 *key) +{ + struct session *sess = session_table_find_tuple6(sess_mgr_rt->tcp_sess_table, key, 0); + if (sess == NULL) + { + return NULL; + } + + const struct layer_private *tcp_layer = packet_get_innermost_layer(pkt, LAYER_PROTO_TCP); + const struct tcphdr *hdr = (const struct tcphdr *)tcp_layer->hdr_ptr; + uint8_t flags = tcp_hdr_get_flags(hdr); + if ((flags & TH_SYN) == 0) + { + return sess; + } + + enum flow_type type = identify_flow_type_by_history(sess, key); + struct tcp_half *half = &sess->tcp_halfs[type]; + if ((half->isn && half->isn != tcp_hdr_get_seq(hdr)) || // recv SYN with different ISN + ((half->history & TH_FIN) || (half->history & TH_RST))) // recv SYN after FIN or RST + { + // TCP port reuse, evict old session + session_manager_runtime_evicte_session(sess_mgr_rt, sess, PORT_REUSE_EVICT); + return NULL; + } + else + { + // TCP SYN retransmission + return sess; + } +} + +static struct session *session_manager_runtime_new_tcp_session(struct session_manager_runtime *sess_mgr_rt, const struct packet *pkt, const struct tuple6 *key) +{ + const struct layer_private *tcp_layer = packet_get_innermost_layer(pkt, LAYER_PROTO_TCP); + const struct tcphdr *hdr = (const struct tcphdr *)tcp_layer->hdr_ptr; + uint8_t flags = tcp_hdr_get_flags(hdr); + if (!(flags & TH_SYN)) + { + sess_mgr_rt->stat.tcp_pkts_bypass_session_not_found++; + return NULL; + } + + // tcp table full evict old session + if (sess_mgr_rt->cfg.evict_old_on_tcp_table_limit && sess_mgr_rt->stat.tcp_sess_used >= sess_mgr_rt->cfg.tcp_session_max - EVICTE_SESSION_BURST) + { + struct session *evic_sess = session_table_find_lru(sess_mgr_rt->tcp_sess_table); + session_manager_runtime_evicte_session(sess_mgr_rt, evic_sess, LRU_EVICT); + } + + enum flow_type type = (flags & TH_ACK) ? FLOW_TYPE_S2C : FLOW_TYPE_C2S; + struct session *sess = session_pool_pop(sess_mgr_rt->sess_pool); + if (sess == NULL) + { + assert(0); + return NULL; + } + session_init(sess); + sess->sess_mgr_rt = sess_mgr_rt; + sess->sess_mgr_stat = &sess_mgr_rt->stat; + + enum session_state next_state = session_transition_run(SESSION_STATE_INIT, TCP_SYN); + session_update(sess_mgr_rt, sess, next_state, pkt, key, type); + session_transition_log(sess, SESSION_STATE_INIT, next_state, TCP_SYN); + + if (tcp_init(sess_mgr_rt, sess) == -1) + { + assert(0); + session_pool_push(sess_mgr_rt->sess_pool, sess); + return NULL; + } + tcp_update(sess_mgr_rt, sess, type, tcp_layer); + + uint64_t timeout = (flags & TH_ACK) ? sess_mgr_rt->cfg.tcp_timeout_ms.handshake : sess_mgr_rt->cfg.tcp_timeout_ms.init; + session_timer_update(sess_mgr_rt->sess_timer, sess, sess_mgr_rt->now_ms + timeout); + session_table_add(sess_mgr_rt->tcp_sess_table, sess); + + if (sess_mgr_rt->cfg.duplicated_packet_bloom_filter.enable) + { + packet_filter_add(sess_mgr_rt->dup_pkt_filter, pkt, sess_mgr_rt->now_ms); + } + + SESS_MGR_STAT_INC(&sess_mgr_rt->stat, next_state, tcp); + sess_mgr_rt->stat.tcp_sess_used++; + sess_mgr_rt->stat.history_tcp_sessions++; + + return sess; +} + +static struct session *session_manager_runtime_new_udp_session(struct session_manager_runtime *sess_mgr_rt, const struct packet *pkt, const struct tuple6 *key) +{ + // udp table full evict old session + if (sess_mgr_rt->cfg.evict_old_on_udp_table_limit && sess_mgr_rt->stat.udp_sess_used >= sess_mgr_rt->cfg.udp_session_max - EVICTE_SESSION_BURST) + { + struct session *evic_sess = session_table_find_lru(sess_mgr_rt->udp_sess_table); + session_manager_runtime_evicte_session(sess_mgr_rt, evic_sess, LRU_EVICT); + } + + struct session *sess = session_pool_pop(sess_mgr_rt->sess_pool); + if (sess == NULL) + { + assert(sess); + return NULL; + } + session_init(sess); + sess->sess_mgr_rt = sess_mgr_rt; + sess->sess_mgr_stat = &sess_mgr_rt->stat; + + enum flow_type type = identify_flow_type_by_port(ntohs(key->src_port), ntohs(key->dst_port)); + enum session_state next_state = session_transition_run(SESSION_STATE_INIT, UDP_DATA); + session_update(sess_mgr_rt, sess, next_state, pkt, key, type); + session_transition_log(sess, SESSION_STATE_INIT, next_state, UDP_DATA); + + session_timer_update(sess_mgr_rt->sess_timer, sess, sess_mgr_rt->now_ms + sess_mgr_rt->cfg.udp_timeout_ms.data); + session_table_add(sess_mgr_rt->udp_sess_table, sess); + + SESS_MGR_STAT_INC(&sess_mgr_rt->stat, next_state, udp); + sess_mgr_rt->stat.udp_sess_used++; + sess_mgr_rt->stat.history_udp_sessions++; + + return sess; +} + +static int session_manager_runtime_update_tcp_session(struct session_manager_runtime *sess_mgr_rt, struct session *sess, const struct packet *pkt, const struct tuple6 *key) +{ + const struct layer_private *tcp_layer = packet_get_innermost_layer(pkt, LAYER_PROTO_TCP); + const struct tcphdr *hdr = (const struct tcphdr *)tcp_layer->hdr_ptr; + enum flow_type type = identify_flow_type_by_history(sess, key); + uint8_t flags = tcp_hdr_get_flags(hdr); + int inputs = 0; + inputs |= (flags & TH_SYN) ? TCP_SYN : NONE; + inputs |= (flags & TH_FIN) ? TCP_FIN : NONE; + inputs |= (flags & TH_RST) ? TCP_RST : NONE; + inputs |= tcp_layer->pld_len ? TCP_DATA : NONE; + + // update state + enum session_state curr_state = session_get_current_state(sess); + enum session_state next_state = session_transition_run(curr_state, inputs); + + // update session + session_update(sess_mgr_rt, sess, next_state, pkt, key, type); + session_transition_log(sess, curr_state, next_state, inputs); + + // update tcp + tcp_update(sess_mgr_rt, sess, type, tcp_layer); + + // set closing reason + if (next_state == SESSION_STATE_CLOSING && !session_get_closing_reason(sess)) + { + if (flags & TH_FIN) + { + session_set_closing_reason(sess, (type == FLOW_TYPE_C2S ? CLOSING_BY_CLIENT_FIN : CLOSING_BY_SERVER_FIN)); + } + if (flags & TH_RST) + { + session_set_closing_reason(sess, (type == FLOW_TYPE_C2S ? CLOSING_BY_CLIENT_RST : CLOSING_BY_SERVER_RST)); + } + } + + // update timeout + struct tcp_half *curr = &sess->tcp_halfs[type]; + struct tcp_half *peer = &sess->tcp_halfs[(type == FLOW_TYPE_C2S ? FLOW_TYPE_S2C : FLOW_TYPE_C2S)]; + uint64_t timeout = 0; + switch (next_state) + { + case SESSION_STATE_OPENING: + if (flags & TH_SYN) + { + timeout = (flags & TH_ACK) ? sess_mgr_rt->cfg.tcp_timeout_ms.handshake : sess_mgr_rt->cfg.tcp_timeout_ms.init; + } + else + { + timeout = sess_mgr_rt->cfg.tcp_timeout_ms.data; + } + break; + case SESSION_STATE_ACTIVE: + timeout = sess_mgr_rt->cfg.tcp_timeout_ms.data; + break; + case SESSION_STATE_CLOSING: + if (flags & TH_FIN) + { + timeout = (peer->history & TH_FIN) ? sess_mgr_rt->cfg.tcp_timeout_ms.time_wait : sess_mgr_rt->cfg.tcp_timeout_ms.half_closed; + } + else if (flags & TH_RST) + { + // if fin is received, the expected sequence number should be increased by 1 + uint32_t expected = (peer->history & TH_FIN) ? peer->ack + 1 : peer->ack; + timeout = (expected == curr->seq) ? sess_mgr_rt->cfg.tcp_timeout_ms.time_wait : sess_mgr_rt->cfg.tcp_timeout_ms.unverified_rst; + } + else + { + timeout = sess_mgr_rt->cfg.tcp_timeout_ms.data; + } + break; + case SESSION_STATE_DISCARD: + timeout = sess_mgr_rt->cfg.tcp_timeout_ms.discard_default; + break; + default: + assert(0); + break; + } + session_timer_update(sess_mgr_rt->sess_timer, sess, sess_mgr_rt->now_ms + timeout); + + SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, tcp); + + return 0; +} + +static int session_manager_runtime_update_udp_session(struct session_manager_runtime *sess_mgr_rt, struct session *sess, const struct packet *pkt, const struct tuple6 *key) +{ + enum flow_type type = identify_flow_type_by_history(sess, key); + enum session_state curr_state = session_get_current_state(sess); + enum session_state next_state = session_transition_run(curr_state, UDP_DATA); + session_update(sess_mgr_rt, sess, next_state, pkt, key, type); + session_transition_log(sess, curr_state, next_state, UDP_DATA); + + if (session_get_current_state(sess) == SESSION_STATE_DISCARD) + { + session_timer_update(sess_mgr_rt->sess_timer, sess, sess_mgr_rt->now_ms + sess_mgr_rt->cfg.udp_timeout_ms.discard_default); + } + else + { + session_timer_update(sess_mgr_rt->sess_timer, sess, sess_mgr_rt->now_ms + sess_mgr_rt->cfg.udp_timeout_ms.data); + } + + SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, udp); + + return 0; +} + +struct session *session_manager_runtime_new_session(struct session_manager_runtime *sess_mgr_rt, const struct packet *pkt, uint64_t now_ms) +{ + sess_mgr_rt->now_ms = now_ms; + + struct tuple6 key; + if (packet_get_innermost_tuple6(pkt, &key)) + { + return NULL; + } + switch (key.ip_proto) + { + case IPPROTO_TCP: + if (session_manager_runtime_bypass_packet_on_tcp_table_limit(sess_mgr_rt, &key)) + { + return NULL; + } + return session_manager_runtime_new_tcp_session(sess_mgr_rt, pkt, &key); + case IPPROTO_UDP: + if (session_manager_runtime_bypass_packet_on_session_evicted(sess_mgr_rt, &key)) + { + return NULL; + } + if (session_manager_runtime_bypass_packet_on_udp_table_limit(sess_mgr_rt, &key)) + { + return NULL; + } + return session_manager_runtime_new_udp_session(sess_mgr_rt, pkt, &key); + default: + return NULL; + } +} + +void session_manager_runtime_free_session(struct session_manager_runtime *sess_mgr_rt, struct session *sess) +{ + if (sess) + { + SESSION_MANAGER_RUNTIME_LOG_DEBUG("session %lu closed (%s)", session_get_id(sess), closing_reason_to_str(session_get_closing_reason(sess))); + + session_timer_del(sess_mgr_rt->sess_timer, sess); + switch (session_get_type(sess)) + { + case SESSION_TYPE_TCP: + tcp_clean(sess_mgr_rt, sess); + if (session_table_find_sessid(sess_mgr_rt->tcp_sess_table, session_get_id(sess), 0) == sess) + { + session_table_del(sess_mgr_rt->tcp_sess_table, sess); + } + SESS_MGR_STAT_DEC(&sess_mgr_rt->stat, session_get_current_state(sess), tcp); + sess_mgr_rt->stat.tcp_sess_used--; + break; + case SESSION_TYPE_UDP: + if (session_table_find_sessid(sess_mgr_rt->udp_sess_table, session_get_id(sess), 0) == sess) + { + session_table_del(sess_mgr_rt->udp_sess_table, sess); + } + SESS_MGR_STAT_DEC(&sess_mgr_rt->stat, session_get_current_state(sess), udp); + sess_mgr_rt->stat.udp_sess_used--; + break; + default: + assert(0); + break; + } + + packet_free((struct packet *)session_get_first_packet(sess, FLOW_TYPE_C2S)); + packet_free((struct packet *)session_get_first_packet(sess, FLOW_TYPE_S2C)); + session_set_first_packet(sess, FLOW_TYPE_C2S, NULL); + session_set_first_packet(sess, FLOW_TYPE_S2C, NULL); + session_clear_route_ctx(sess, FLOW_TYPE_C2S); + session_clear_route_ctx(sess, FLOW_TYPE_S2C); + session_clear_sids(sess, FLOW_TYPE_C2S); + session_clear_sids(sess, FLOW_TYPE_S2C); + session_set_current_state(sess, SESSION_STATE_INIT); + session_set_current_packet(sess, NULL); + session_set_flow_type(sess, FLOW_TYPE_NONE); + session_pool_push(sess_mgr_rt->sess_pool, sess); + sess = NULL; + } +} + +struct session *session_manager_runtime_lookup_session_by_packet(struct session_manager_runtime *sess_mgr_rt, const struct packet *pkt) +{ + struct tuple6 key; + if (packet_get_innermost_tuple6(pkt, &key)) + { + return NULL; + } + switch (key.ip_proto) + { + case IPPROTO_UDP: + return session_table_find_tuple6(sess_mgr_rt->udp_sess_table, &key, 0); + case IPPROTO_TCP: + return session_manager_runtime_lookup_tcp_session(sess_mgr_rt, pkt, &key); + default: + return NULL; + } +} + +struct session *session_manager_runtime_lookup_session_by_id(struct session_manager_runtime *sess_mgr_rt, uint64_t sess_id) +{ + struct session *sess = NULL; + sess = session_table_find_sessid(sess_mgr_rt->tcp_sess_table, sess_id, 1); + if (sess) + { + return sess; + } + + sess = session_table_find_sessid(sess_mgr_rt->udp_sess_table, sess_id, 1); + if (sess) + { + return sess; + } + + return NULL; +} + +int session_manager_runtime_update_session(struct session_manager_runtime *sess_mgr_rt, struct session *sess, const struct packet *pkt, uint64_t now_ms) +{ + sess_mgr_rt->now_ms = now_ms; + + struct tuple6 key; + if (packet_get_innermost_tuple6(pkt, &key)) + { + return -1; + } + if (session_manager_runtime_bypass_duplicated_packet(sess_mgr_rt, sess, pkt, &key)) + { + return -1; + } + switch (session_get_type(sess)) + { + case SESSION_TYPE_TCP: + return session_manager_runtime_update_tcp_session(sess_mgr_rt, sess, pkt, &key); + case SESSION_TYPE_UDP: + return session_manager_runtime_update_udp_session(sess_mgr_rt, sess, pkt, &key); + default: + return -1; + } +} + +struct session *session_manager_runtime_get_expired_session(struct session_manager_runtime *sess_mgr_rt, uint64_t now_ms) +{ + sess_mgr_rt->now_ms = now_ms; + + struct session *sess = session_timer_expire(sess_mgr_rt->sess_timer, now_ms); + if (sess) + { + enum session_state curr_state = session_get_current_state(sess); + enum session_state next_state = session_transition_run(curr_state, TIMEOUT); + session_transition_log(sess, curr_state, next_state, TIMEOUT); + session_set_current_state(sess, next_state); + + switch (session_get_type(sess)) + { + case SESSION_TYPE_TCP: + SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, tcp); + break; + case SESSION_TYPE_UDP: + SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, udp); + break; + default: + assert(0); + break; + } + + // next state is closed, need to free session + if (next_state == SESSION_STATE_CLOSED) + { + if (!session_get_closing_reason(sess)) + { + session_set_closing_reason(sess, CLOSING_BY_TIMEOUT); + } + return sess; + } + // next state is closing, only update timeout + else + { + switch (session_get_type(sess)) + { + case SESSION_TYPE_TCP: + session_timer_update(sess_mgr_rt->sess_timer, sess, now_ms + sess_mgr_rt->cfg.tcp_timeout_ms.data); + break; + case SESSION_TYPE_UDP: + session_timer_update(sess_mgr_rt->sess_timer, sess, now_ms + sess_mgr_rt->cfg.udp_timeout_ms.data); + break; + default: + assert(0); + break; + } + return NULL; + } + } + + return NULL; +} + +struct session *session_manager_runtime_get_evicted_session(struct session_manager_runtime *sess_mgr_rt) +{ + struct session *sess = TAILQ_FIRST(&sess_mgr_rt->evicte_list); + if (sess) + { + TAILQ_REMOVE(&sess_mgr_rt->evicte_list, sess, evicte_tqe); + } + return sess; +} + +uint64_t session_manager_runtime_clean_session(struct session_manager_runtime *sess_mgr_rt, uint64_t now_ms, struct session *cleaned_sess[], uint64_t array_size) +{ + sess_mgr_rt->now_ms = now_ms; + struct session *sess = NULL; + uint64_t cleaned_sess_num = 0; + uint64_t expired_sess_num = 0; + + uint8_t expired_sess_canbe_clean = 0; + if (now_ms - sess_mgr_rt->last_clean_expired_sess_ts >= sess_mgr_rt->cfg.expire_period_ms) + { + expired_sess_canbe_clean = 1; + } + + for (uint64_t i = 0; i < array_size; i++) + { + // frist clean evicted session + sess = session_manager_runtime_get_evicted_session(sess_mgr_rt); + if (sess) + { + cleaned_sess[cleaned_sess_num++] = sess; + } + // then clean expired session + else + { + if (expired_sess_canbe_clean && expired_sess_num < sess_mgr_rt->cfg.expire_batch_max) + { + sess_mgr_rt->last_clean_expired_sess_ts = now_ms; + sess = session_manager_runtime_get_expired_session(sess_mgr_rt, now_ms); + if (sess) + { + cleaned_sess[cleaned_sess_num++] = sess; + expired_sess_num++; + } + else + { + break; + } + } + else + { + break; + } + } + } + + return cleaned_sess_num; +} + +/****************************************************************************** + * stat -- get / print + ******************************************************************************/ + +struct session_manager_stat *session_manager_runtime_get_stat(struct session_manager_runtime *sess_mgr_rt) +{ + return &sess_mgr_rt->stat; +} + +void session_manager_runtime_print_stat(struct session_manager_runtime *sess_mgr_rt) +{ + struct session_manager_stat *stat = &sess_mgr_rt->stat; + + // TCP session + SESSION_MANAGER_RUNTIME_LOG_INFO("TCP session: history=%lu, used=%lu, opening=%lu, active=%lu, closing=%lu, discard=%lu, closed=%lu", + stat->history_tcp_sessions, stat->tcp_sess_used, stat->tcp_sess_opening, stat->tcp_sess_active, + stat->tcp_sess_closing, stat->tcp_sess_discard, stat->tcp_sess_closed); + // UDP session + SESSION_MANAGER_RUNTIME_LOG_INFO("UDP session: history=%lu, used=%lu, opening=%lu, active=%lu, closing=%lu, discard=%lu, closed=%lu", + stat->history_udp_sessions, stat->udp_sess_used, stat->udp_sess_opening, stat->udp_sess_active, + stat->udp_sess_closing, stat->udp_sess_discard, stat->udp_sess_closed); + // evicted session + SESSION_MANAGER_RUNTIME_LOG_INFO("evicted session: TCP=%lu, UDP=%lu", stat->tcp_sess_evicted, stat->udp_sess_evicted); + // Bypassed packet + SESSION_MANAGER_RUNTIME_LOG_INFO("bypassed TCP packet: table_full=%lu, session_not_found=%lu, duplicated=%lu", + stat->tcp_pkts_bypass_table_full, stat->tcp_pkts_bypass_session_not_found, stat->tcp_pkts_bypass_duplicated); + SESSION_MANAGER_RUNTIME_LOG_INFO("bypassed UDP packet: table_full=%lu, session_evicted=%lu, duplicated=%lu", + stat->udp_pkts_bypass_table_full, stat->udp_pkts_bypass_session_evicted, stat->udp_pkts_bypass_duplicated); + // TCP segment + SESSION_MANAGER_RUNTIME_LOG_INFO("TCP segment: input=%lu, consumed=%lu, timeout=%lu, retransmited=%lu, overlapped=%lu, omitted_too_many=%lu, inorder=%lu, reordered=%lu, buffered=%lu, freed=%lu", + stat->tcp_segs_input, stat->tcp_segs_consumed, stat->tcp_segs_timeout, stat->tcp_segs_retransmited, + stat->tcp_segs_overlapped, stat->tcp_segs_omitted_too_many, stat->tcp_segs_inorder, stat->tcp_segs_reordered, + stat->tcp_segs_buffered, stat->tcp_segs_freed); +} + +/****************************************************************************** + * scan + ******************************************************************************/ + +static inline uint8_t ipv4_in_range(const struct in_addr *addr, const struct in_addr *start, const struct in_addr *end) +{ + return (memcmp(addr, start, sizeof(struct in_addr)) >= 0 && memcmp(addr, end, sizeof(struct in_addr)) <= 0); +} + +static inline uint8_t ipv6_in_range(const struct in6_addr *addr, const struct in6_addr *start, const struct in6_addr *end) +{ + return (memcmp(addr, start, sizeof(struct in6_addr)) >= 0 && memcmp(addr, end, sizeof(struct in6_addr)) <= 0); +} + +uint64_t session_manager_runtime_scan(const struct session_manager_runtime *sess_mgr_rt, const struct session_scan_opts *opts, uint64_t mached_sess_ids[], uint64_t array_size) +{ + uint64_t capacity = 0; + uint64_t max_loop = 0; + uint64_t mached_sess_num = 0; + const struct session *sess = NULL; + const struct tuple6 *tuple = NULL; + + if (sess_mgr_rt == NULL || opts == NULL || mached_sess_ids == NULL || array_size == 0) + { + return mached_sess_num; + } + if (opts->count == 0) + { + return mached_sess_num; + } + capacity = session_pool_capacity_size(sess_mgr_rt->sess_pool); + if (opts->cursor >= capacity) + { + return mached_sess_num; + } + + max_loop = MIN(capacity, opts->cursor + opts->count); + for (uint64_t i = opts->cursor; i < max_loop; i++) + { + sess = session_pool_get0(sess_mgr_rt->sess_pool, i); + tuple = session_get_tuple6(sess); + if (session_get_current_state(sess) == SESSION_STATE_INIT) + { + continue; + } + + if ((opts->flags & SESSION_SCAN_TYPE) && opts->type != session_get_type(sess)) + { + continue; + } + if ((opts->flags & SESSION_SCAN_STATE) && opts->state != session_get_current_state(sess)) + { + continue; + } + if ((opts->flags & SESSION_SCAN_CREATE_TIME) && + (session_get_timestamp(sess, SESSION_TIMESTAMP_START) < opts->create_time_ms[0] || + session_get_timestamp(sess, SESSION_TIMESTAMP_START) > opts->create_time_ms[1])) + { + continue; + } + if ((opts->flags & SESSION_SCAN_LAST_PKT_TIME) && + (session_get_timestamp(sess, SESSION_TIMESTAMP_LAST) < opts->last_pkt_time_ms[0] || + session_get_timestamp(sess, SESSION_TIMESTAMP_LAST) > opts->last_pkt_time_ms[1])) + { + continue; + } + if ((opts->flags & SESSION_SCAN_SPORT) && opts->src_port != tuple->src_port) + { + continue; + } + if ((opts->flags & SESSION_SCAN_DPORT) && opts->dst_port != tuple->dst_port) + { + continue; + } + if (opts->flags & SESSION_SCAN_SIP) + { + if (opts->addr_family != tuple->addr_family) + { + continue; + } + if ((opts->addr_family == AF_INET) && !ipv4_in_range(&tuple->src_addr.v4, &opts->src_addr[0].v4, &opts->src_addr[1].v4)) + { + continue; + } + if ((opts->addr_family == AF_INET6) && !ipv6_in_range(&tuple->src_addr.v6, &opts->src_addr[0].v6, &opts->src_addr[1].v6)) + { + continue; + } + } + if (opts->flags & SESSION_SCAN_DIP) + { + if (opts->addr_family != tuple->addr_family) + { + continue; + } + if ((opts->addr_family == AF_INET) && !ipv4_in_range(&tuple->dst_addr.v4, &opts->dst_addr[0].v4, &opts->dst_addr[1].v4)) + { + continue; + } + if ((opts->addr_family == AF_INET6) && !ipv6_in_range(&tuple->dst_addr.v6, &opts->dst_addr[0].v6, &opts->dst_addr[1].v6)) + { + continue; + } + } + + mached_sess_ids[mached_sess_num++] = session_get_id(sess); + if (mached_sess_num >= array_size) + { + break; + } + } + + SESSION_MANAGER_RUNTIME_LOG_DEBUG("session scan: cursor=%lu, count=%lu, mached_sess_num=%lu", opts->cursor, opts->count, mached_sess_num); + return mached_sess_num; +} + +/****************************************************************************** + * other + ******************************************************************************/ + +void session_set_discard(struct session *sess) +{ + struct session_manager_runtime *sess_mgr_rt = sess->sess_mgr_rt; + enum session_type type = session_get_type(sess); + enum session_state curr_state = session_get_current_state(sess); + enum session_state next_state = session_transition_run(curr_state, USER_CLOSE); + session_transition_log(sess, curr_state, next_state, USER_CLOSE); + session_set_current_state(sess, next_state); + + switch (type) + { + case SESSION_TYPE_TCP: + session_timer_update(sess_mgr_rt->sess_timer, sess, sess_mgr_rt->now_ms + sess_mgr_rt->cfg.tcp_timeout_ms.discard_default); + SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, tcp); + break; + case SESSION_TYPE_UDP: + session_timer_update(sess_mgr_rt->sess_timer, sess, sess_mgr_rt->now_ms + sess_mgr_rt->cfg.udp_timeout_ms.discard_default); + SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, udp); + break; + default: + assert(0); + break; + } +}
\ No newline at end of file diff --git a/infra/session_manager/session_manager.h b/infra/session_manager/session_manager_runtime.h index 910c6a7..910c6a7 100644 --- a/infra/session_manager/session_manager.h +++ b/infra/session_manager/session_manager_runtime.h diff --git a/infra/session_manager/session_utils.c b/infra/session_manager/session_utils.c index 062eb53..83d6888 100644 --- a/infra/session_manager/session_utils.c +++ b/infra/session_manager/session_utils.c @@ -1,5 +1,6 @@ +#include "stellar/exdata.h" #include "session_internal.h" -#include "session_manager.h" +#include "session_manager_runtime.h" void session_init(struct session *sess) { @@ -208,6 +209,7 @@ struct tcp_segment *session_get_tcp_segment(struct session *sess) { sess->sess_mgr_stat->tcp_segs_consumed++; half->in_order_ref++; + half->in_order.user_data = sess; return &half->in_order; } else @@ -221,6 +223,7 @@ struct tcp_segment *session_get_tcp_segment(struct session *sess) // TODO sess->sess_mgr_stat->tcp_segs_consumed++; sess->sess_mgr_stat->tcp_segs_reordered++; + seg->user_data = sess; } return seg; } @@ -461,3 +464,15 @@ void session_print(const struct session *sess) session_to_str(sess, 0, buff, sizeof(buff)); printf("%s\n", buff); } + +void session_set_exdata(struct session *sess, int idx, void *ex_ptr) +{ + struct exdata_runtime *rte = (struct exdata_runtime *)session_get_user_data(sess); + exdata_set(rte, idx, ex_ptr); +} + +void *session_get_exdata(const struct session *sess, int idx) +{ + struct exdata_runtime *rte = (struct exdata_runtime *)session_get_user_data(sess); + return exdata_get(rte, idx); +} diff --git a/infra/session_manager/test/default_config.h b/infra/session_manager/test/default_config.h index 729caed..f8ce5fd 100644 --- a/infra/session_manager/test/default_config.h +++ b/infra/session_manager/test/default_config.h @@ -7,7 +7,7 @@ extern "C" { #endif -#include "session_manager.h" +#include "session_manager_runtime.h" static struct session_manager_config sess_mgr_cfg = { .session_id_seed = 0xFFFFF, diff --git a/infra/stellar_core.c b/infra/stellar_core.c index b3e0471..56131d6 100644 --- a/infra/stellar_core.c +++ b/infra/stellar_core.c @@ -17,7 +17,7 @@ #include "stellar_stat.h" #include "packet_internal.h" #include "session_internal.h" -#include "session_manager.h" +#include "session_manager_runtime.h" #define CORE_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "core", format, ##__VA_ARGS__) #define CORE_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "core", format, ##__VA_ARGS__) diff --git a/infra/stellar_stat.h b/infra/stellar_stat.h index 3560ce8..7d334f7 100644 --- a/infra/stellar_stat.h +++ b/infra/stellar_stat.h @@ -7,7 +7,7 @@ extern "C" #include "packet_io.h" #include "ip_reassembly.h" -#include "session_manager.h" +#include "session_manager_runtime.h" struct thread_stat { diff --git a/infra/tcp_reassembly/tcp_reassembly.c b/infra/tcp_reassembly/tcp_reassembly.c index 2591ab2..3eb7f18 100644 --- a/infra/tcp_reassembly/tcp_reassembly.c +++ b/infra/tcp_reassembly/tcp_reassembly.c @@ -255,28 +255,4 @@ uint32_t tcp_reassembly_get_recv_next(struct tcp_reassembly *tcp_reass) } return tcp_reass->recv_next; -} - -const char *tcp_segment_get_data(const struct tcp_segment *seg) -{ - if (seg == NULL) - { - return NULL; - } - else - { - return (const char *)seg->data; - } -} - -uint16_t tcp_segment_get_len(const struct tcp_segment *seg) -{ - if (seg == NULL) - { - return 0; - } - else - { - return seg->len; - } }
\ No newline at end of file diff --git a/infra/tcp_reassembly/tcp_reassembly.h b/infra/tcp_reassembly/tcp_reassembly.h index 50e369b..87660c4 100644 --- a/infra/tcp_reassembly/tcp_reassembly.h +++ b/infra/tcp_reassembly/tcp_reassembly.h @@ -12,6 +12,7 @@ struct tcp_segment { uint32_t len; const void *data; + void *user_data; }; struct tcp_segment *tcp_segment_new(uint32_t seq, const void *data, uint32_t len); diff --git a/infra/version.map b/infra/version.map index ea3336d..f241d50 100644 --- a/infra/version.map +++ b/infra/version.map @@ -18,9 +18,6 @@ global: packet_build_udp; packet_build_l3; - tcp_segment_get_data; - tcp_segment_get_len; - exdata_*; mq_*; stellar_module_*; @@ -57,6 +54,9 @@ global: log_print; log_check_level; + session_manager_module_on_init; + session_manager_module_on_exit; + http_message_*; http_decoder_init; http_decoder_exit; |
