summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/stellar/packet.h3
-rw-r--r--include/stellar/packet_manager.h7
-rw-r--r--include/stellar/session.h15
-rw-r--r--include/stellar/session_manager.h27
-rw-r--r--infra/packet_manager/CMakeLists.txt2
-rw-r--r--infra/packet_manager/packet_manager.c44
-rw-r--r--infra/packet_manager/packet_manager_internal.h1
-rw-r--r--infra/packet_manager/packet_manager_private.h46
-rw-r--r--infra/packet_manager/packet_utils.c13
-rw-r--r--infra/session_manager/CMakeLists.txt5
-rw-r--r--infra/session_manager/session_manager.c1754
-rw-r--r--infra/session_manager/session_manager_runtime.c1462
-rw-r--r--infra/session_manager/session_manager_runtime.h (renamed from infra/session_manager/session_manager.h)0
-rw-r--r--infra/session_manager/session_utils.c17
-rw-r--r--infra/session_manager/test/default_config.h2
-rw-r--r--infra/stellar_core.c2
-rw-r--r--infra/stellar_stat.h2
-rw-r--r--infra/tcp_reassembly/tcp_reassembly.c24
-rw-r--r--infra/tcp_reassembly/tcp_reassembly.h1
-rw-r--r--infra/version.map6
20 files changed, 1935 insertions, 1498 deletions
diff --git a/include/stellar/packet.h b/include/stellar/packet.h
index 5ea2e75..e910237 100644
--- a/include/stellar/packet.h
+++ b/include/stellar/packet.h
@@ -185,6 +185,9 @@ uint16_t packet_get_raw_len(const struct packet *pkt);
const char *packet_get_payload(const struct packet *pkt);
uint16_t packet_get_payload_len(const struct packet *pkt);
+void packet_set_exdata(struct packet *pkt, int idx, void *ex_ptr);
+void *packet_get_exdata(struct packet *pkt, int idx);
+
#ifdef __cplusplus
}
#endif
diff --git a/include/stellar/packet_manager.h b/include/stellar/packet_manager.h
index ebbff80..8270c7b 100644
--- a/include/stellar/packet_manager.h
+++ b/include/stellar/packet_manager.h
@@ -5,7 +5,8 @@ extern "C"
{
#endif
-#include "packet.h"
+#include "stellar/exdata.h"
+#include "stellar/packet.h"
enum packet_stage
{
@@ -19,8 +20,10 @@ enum packet_stage
struct packet_manager;
+int packet_manager_new_packet_exdata_index(struct packet_manager *pkt_mgr, const char *name, exdata_free *func, void *arg);
+
typedef void on_packet_stage_callback(enum packet_stage stage, struct packet *pkt, void *args);
-int packet_manager_subscribe(struct packet_manager *pkt_mgr, enum packet_stage stage, on_packet_stage_callback cb, void *args);
+int packet_manager_subscribe(struct packet_manager *pkt_mgr, enum packet_stage stage, on_packet_stage_callback *cb, void *args);
// if two modules claim the same packet at the same stage, the second 'claim' fails.
// return 0 on success
diff --git a/include/stellar/session.h b/include/stellar/session.h
index a1aa684..67c8c98 100644
--- a/include/stellar/session.h
+++ b/include/stellar/session.h
@@ -5,20 +5,8 @@ extern "C"
{
#endif
-#include <stdint.h>
-
#include "stellar/packet.h"
-struct tcp_segment;
-const char *tcp_segment_get_data(const struct tcp_segment *seg);
-uint16_t tcp_segment_get_len(const struct tcp_segment *seg);
-
-#define TOPIC_TCP_STREAM "TCP_STREAM" //topic message: tcp_segment
-#define TOPIC_CONTROL_PACKET "CONTROL_PACKET" //topic message: packet
-
-#define TOPIC_TCP "TCP" //topic message: session
-#define TOPIC_UDP "UDP" //topic message: session
-
enum session_state
{
SESSION_STATE_INIT = 0,
@@ -155,6 +143,9 @@ const char *session_get0_readable_addr(const struct session *sess);
void session_set_discard(struct session *sess);
+void session_set_exdata(struct session *sess, int idx, void *ex_ptr);
+void *session_get_exdata(const struct session *sess, int idx);
+
#ifdef __cplusplus
}
#endif
diff --git a/include/stellar/session_manager.h b/include/stellar/session_manager.h
new file mode 100644
index 0000000..81fc316
--- /dev/null
+++ b/include/stellar/session_manager.h
@@ -0,0 +1,27 @@
+
+
+#pragma once
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+#include "stellar/exdata.h"
+#include "stellar/session.h"
+
+struct session_manager;
+
+int session_manager_new_packet_exdata_index(struct session_manager *sess_mgr, const char *name, exdata_free *func, void *arg);
+
+typedef void on_session_callback(struct session *sess, struct packet *pkt, void *args);
+typedef void on_tcp_stream_callback(struct session *sess, const char *tcp_payload, uint32_t tcp_payload_len, void *args);
+
+int session_manager_subscribe_tcp(struct session_manager *sess_mgr, on_session_callback *cb, void *args);
+int session_manager_subscribe_udp(struct session_manager *sess_mgr, on_session_callback *cb, void *args);
+int session_manager_subscribe_control_packet(struct session_manager *sess_mgr, on_session_callback *cb, void *args);
+int session_manager_subscribe_tcp_stream(struct session_manager *sess_mgr, on_tcp_stream_callback *cb, void *args);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/infra/packet_manager/CMakeLists.txt b/infra/packet_manager/CMakeLists.txt
index ada4303..24cae44 100644
--- a/infra/packet_manager/CMakeLists.txt
+++ b/infra/packet_manager/CMakeLists.txt
@@ -11,6 +11,6 @@ target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/uthash
target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/logger)
target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/include)
target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/infra)
-target_link_libraries(packet_manager tuple logger dablooms mq)
+target_link_libraries(packet_manager tuple logger dablooms mq exdata)
add_subdirectory(test) \ No newline at end of file
diff --git a/infra/packet_manager/packet_manager.c b/infra/packet_manager/packet_manager.c
index 44efca8..2c422d3 100644
--- a/infra/packet_manager/packet_manager.c
+++ b/infra/packet_manager/packet_manager.c
@@ -18,6 +18,7 @@ struct packet_manager_config
struct packet_manager_schema
{
+ struct exdata_schema *exdata;
struct mq_schema *mq;
int topic_id[PACKET_STAGE_MAX];
};
@@ -50,17 +51,17 @@ const char *packet_stage_to_str(enum packet_stage stage)
switch (stage)
{
case PACKET_STAGE_PREROUTING:
- return "prerouting";
+ return "PACKET_STAGE_PREROUTING";
case PACKET_STAGE_INPUT:
- return "input";
+ return "PACKET_STAGE_INPUT";
case PACKET_STAGE_FORWARD:
- return "forward";
+ return "PACKET_STAGE_FORWARD";
case PACKET_STAGE_OUTPUT:
- return "output";
+ return "PACKET_STAGE_OUTPUT";
case PACKET_STAGE_POSTROUTING:
- return "postrouting";
+ return "PACKET_STAGE_POSTROUTING";
default:
- return "unknown";
+ return "PACKET_STAGE_UNKNOWN";
}
}
@@ -102,7 +103,7 @@ static struct packet_manager_config *packet_manager_config_new(const char *toml_
* packet manager schema
******************************************************************************/
-static void on_packet_stage_dispatch(int topic_id, const void *msg, on_msg_cb_func *cb, void *cb_arg, void *dispatch_arg)
+static void on_packet_stage_dispatch(int topic_id, void *msg, on_msg_cb_func *cb, void *cb_arg, void *dispatch_arg)
{
assert(msg);
assert(dispatch_arg);
@@ -120,7 +121,7 @@ static void on_packet_stage_dispatch(int topic_id, const void *msg, on_msg_cb_fu
}
}
- ((on_packet_stage_callback *)cb)(stage, pkt, cb_arg);
+ ((on_packet_stage_callback *)(void *)cb)(stage, pkt, cb_arg);
}
static void packet_manager_schema_free(struct packet_manager_schema *pkt_mgr_schema)
@@ -138,6 +139,11 @@ static void packet_manager_schema_free(struct packet_manager_schema *pkt_mgr_sch
}
}
+ if (pkt_mgr_schema->exdata)
+ {
+ exdata_schema_free(pkt_mgr_schema->exdata);
+ }
+
free(pkt_mgr_schema);
pkt_mgr_schema = NULL;
}
@@ -152,11 +158,17 @@ static struct packet_manager_schema *packet_manager_schema_new(struct mq_schema
return NULL;
}
- pkt_mgr_schema->mq = mq;
+ pkt_mgr_schema->exdata = exdata_schema_new();
+ if (pkt_mgr_schema->exdata == NULL)
+ {
+ PACKET_MANAGER_LOG_ERROR("failed to create exdata_schema");
+ goto error_out;
+ }
+ pkt_mgr_schema->mq = mq;
for (int i = 0; i < PACKET_STAGE_MAX; i++)
{
- pkt_mgr_schema->topic_id[i] = mq_schema_create_topic(pkt_mgr_schema->mq, packet_stage_to_str(i), (on_msg_dispatch_cb_func *)on_packet_stage_dispatch, pkt_mgr_schema, NULL, NULL);
+ pkt_mgr_schema->topic_id[i] = mq_schema_create_topic(pkt_mgr_schema->mq, packet_stage_to_str(i), &on_packet_stage_dispatch, pkt_mgr_schema, NULL, NULL);
if (pkt_mgr_schema->topic_id[i] < 0)
{
PACKET_MANAGER_LOG_ERROR("failed to create topic %s", packet_stage_to_str(i));
@@ -276,7 +288,12 @@ void packet_manager_free(struct packet_manager *pkt_mgr)
}
}
-int packet_manager_subscribe(struct packet_manager *pkt_mgr, enum packet_stage stage, on_packet_stage_callback cb, void *args)
+int packet_manager_new_packet_exdata_index(struct packet_manager *pkt_mgr, const char *name, exdata_free *func, void *arg)
+{
+ return exdata_schema_new_index(pkt_mgr->schema->exdata, name, func, arg);
+}
+
+int packet_manager_subscribe(struct packet_manager *pkt_mgr, enum packet_stage stage, on_packet_stage_callback *cb, void *args)
{
return mq_schema_subscribe(pkt_mgr->schema->mq, pkt_mgr->schema->topic_id[stage], (on_msg_cb_func *)cb, args);
}
@@ -291,6 +308,8 @@ void packet_manager_init(struct packet_manager *pkt_mgr, uint16_t thread_id, str
void packet_manager_ingress(struct packet_manager *pkt_mgr, uint16_t thread_id, struct packet *pkt)
{
struct packet_manager_runtime *runtime = pkt_mgr->runtime[thread_id];
+ struct exdata_runtime *exdata_rt = exdata_runtime_new(pkt_mgr->schema->exdata);
+ packet_set_user_data(pkt, exdata_rt);
runtime->stat.total.pkts_ingress++;
runtime->stat.queue[PACKET_STAGE_PREROUTING].pkts_in++;
@@ -307,6 +326,9 @@ struct packet *packet_manager_egress(struct packet_manager *pkt_mgr, uint16_t th
runtime->stat.total.pkts_egress++;
runtime->stat.queue[PACKET_STAGE_MAX].pkts_out++;
TAILQ_REMOVE(&runtime->queue[PACKET_STAGE_MAX], pkt, stage_tqe);
+
+ struct exdata_runtime *exdata_rt = packet_get_user_data(pkt);
+ exdata_runtime_free(exdata_rt);
}
return pkt;
}
diff --git a/infra/packet_manager/packet_manager_internal.h b/infra/packet_manager/packet_manager_internal.h
index f8ac98e..13bdc3b 100644
--- a/infra/packet_manager/packet_manager_internal.h
+++ b/infra/packet_manager/packet_manager_internal.h
@@ -5,6 +5,7 @@ extern "C"
{
#endif
+#include "stellar/mq.h"
#include "stellar/packet_manager.h"
#define PACKET_QUEUE_MAX (PACKET_STAGE_MAX + 1)
diff --git a/infra/packet_manager/packet_manager_private.h b/infra/packet_manager/packet_manager_private.h
deleted file mode 100644
index 4fe6a36..0000000
--- a/infra/packet_manager/packet_manager_private.h
+++ /dev/null
@@ -1,46 +0,0 @@
-#pragma once
-
-#ifdef __cplusplus
-extern "C"
-{
-#endif
-
-#include "stellar/packet_manager.h"
-
-#define PACKET_QUEUE_MAX (PACKET_STAGE_MAX + 1)
-
-struct packet_manager *packet_manager_new(struct mq_schema *mq_schema, const char *toml_file);
-void packet_manager_free(struct packet_manager *pkt_mgr);
-
-void packet_manager_runtime_init(struct packet_manager_runtime *pkt_mgr_rt, struct mq_runtime *mq_rt);
-void packet_manager_runtime_ingress(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt);
-struct packet *packet_manager_runtime_egress(struct packet_manager_runtime *pkt_mgr_rt);
-void packet_manager_runtime_dispatch(struct packet_manager_runtime *pkt_mgr_rt);
-
-/******************************************************************************
- * for gtest
- ******************************************************************************/
-
-struct packet_manager_stat
-{
- struct
- {
- uint64_t pkts_ingress;
- uint64_t pkts_egress;
- } total;
- struct
- {
- uint64_t pkts_in; // include the packets that are scheduled
- uint64_t pkts_out; // include the packets that are claimed
- uint64_t pkts_claim;
- uint64_t pkts_schedule;
- } queue[PACKET_QUEUE_MAX]; // the last queue is for sending packets
-} __attribute__((aligned(64)));
-
-const char *packet_stage_to_str(enum packet_stage stage);
-void packet_manager_runtime_print_stat(struct packet_manager_runtime *runtime);
-struct packet_manager_stat *packet_manager_runtime_get_stat(struct packet_manager_runtime *runtime);
-
-#ifdef __cplusplus
-}
-#endif
diff --git a/infra/packet_manager/packet_utils.c b/infra/packet_manager/packet_utils.c
index a9d4f80..331afee 100644
--- a/infra/packet_manager/packet_utils.c
+++ b/infra/packet_manager/packet_utils.c
@@ -5,6 +5,7 @@
#include "log_internal.h"
#include "packet_helper.h"
#include "packet_internal.h"
+#include "stellar/exdata.h"
#define PACKET_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "packet", format, ##__VA_ARGS__)
@@ -951,3 +952,15 @@ int packet_is_fragment(const struct packet *pkt)
{
return (pkt->frag_layer) ? 1 : 0;
}
+
+void packet_set_exdata(struct packet *pkt, int idx, void *ex_ptr)
+{
+ struct exdata_runtime *exdata_rt = (struct exdata_runtime *)packet_get_user_data(pkt);
+ exdata_set(exdata_rt, idx, ex_ptr);
+}
+
+void *packet_get_exdata(struct packet *pkt, int idx)
+{
+ struct exdata_runtime *exdata_rt = (struct exdata_runtime *)packet_get_user_data(pkt);
+ return exdata_get(exdata_rt, idx);
+}
diff --git a/infra/session_manager/CMakeLists.txt b/infra/session_manager/CMakeLists.txt
index 5404fde..a045740 100644
--- a/infra/session_manager/CMakeLists.txt
+++ b/infra/session_manager/CMakeLists.txt
@@ -4,12 +4,13 @@ add_library(session_manager
session_table.c
session_timer.c
session_filter.c
- session_manager.c
session_transition.c
+ session_manager_runtime.c
+ session_manager.c
)
target_include_directories(session_manager PUBLIC ${CMAKE_CURRENT_LIST_DIR})
target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/infra/)
target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/include)
-target_link_libraries(session_manager timeout packet_manager tcp_reassembly)
+target_link_libraries(session_manager timeout packet_manager tcp_reassembly mq exdata)
add_subdirectory(test) \ No newline at end of file
diff --git a/infra/session_manager/session_manager.c b/infra/session_manager/session_manager.c
index c56258d..e8b2a08 100644
--- a/infra/session_manager/session_manager.c
+++ b/infra/session_manager/session_manager.c
@@ -1,1462 +1,430 @@
-#include <time.h>
-#include <stdlib.h>
#include <assert.h>
-#include <errno.h>
+#include <stdlib.h>
+
+#include "stellar/exdata.h"
+#include "stellar/packet_manager.h"
+#include "stellar/session_manager.h"
+#include "stellar/module_manager.h"
#include "utils.h"
-#include "packet_helper.h"
-#include "packet_filter.h"
#include "session_internal.h"
-#include "session_pool.h"
-#include "session_table.h"
-#include "session_timer.h"
-#include "session_filter.h"
-#include "session_manager.h"
-#include "session_transition.h"
+#include "session_manager_runtime.h"
-#define SESSION_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "session", format, ##__VA_ARGS__)
-#define SESSION_LOG_DEBUG(format, ...) STELLAR_LOG_DEBUG(__thread_local_logger, "session", format, ##__VA_ARGS__)
-#define SESSION_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "session", format, ##__VA_ARGS__)
+#pragma GCC diagnostic ignored "-Wunused-parameter"
+#pragma GCC diagnostic ignored "-Wunused-function"
-struct snowflake
-{
- uint64_t seed;
- uint64_t sequence;
-};
+#define SESSION_MANAGER_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "session manager", format, ##__VA_ARGS__)
+#define SESSION_MANAGER_LOG_DEBUG(format, ...) STELLAR_LOG_DEBUG(__thread_local_logger, "session manager", format, ##__VA_ARGS__)
+#define SESSION_MANAGER_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "session manager", format, ##__VA_ARGS__)
-struct session_manager_runtime
+struct session_manager_schema
{
- struct session_list evicte_list;
- struct session_pool *sess_pool;
- struct session_timer *sess_timer;
- struct session_table *tcp_sess_table;
- struct session_table *udp_sess_table;
-
- struct packet_filter *dup_pkt_filter;
- struct session_filter *evicte_sess_filter;
+ struct exdata_schema *exdata;
+ struct mq_schema *mq;
- struct session_manager_stat stat;
- struct session_manager_config cfg;
+ int pkt_exdata_idx;
- /*
- * only used for session_set_discard() or session_manager_runtime_record_duplicated_packet(),
- * because the function is called by plugin and has no time input.
- */
- uint64_t now_ms;
- uint64_t last_clean_expired_sess_ts;
- struct snowflake *sf;
+ int topic_id_tcp;
+ int topic_id_udp;
+ int topic_id_ctrl_pkt;
+ int topic_id_tcp_stream;
};
-#define EVICTE_SESSION_BURST (RX_BURST_MAX)
-
-/******************************************************************************
- * session manager stat macro
- ******************************************************************************/
-
-#define SESS_MGR_STAT_INC(stat, state, proto) \
- { \
- switch ((state)) \
- { \
- case SESSION_STATE_OPENING: \
- (stat)->proto##_sess_opening++; \
- break; \
- case SESSION_STATE_ACTIVE: \
- (stat)->proto##_sess_active++; \
- break; \
- case SESSION_STATE_CLOSING: \
- (stat)->proto##_sess_closing++; \
- break; \
- case SESSION_STATE_DISCARD: \
- (stat)->proto##_sess_discard++; \
- break; \
- case SESSION_STATE_CLOSED: \
- (stat)->proto##_sess_closed++; \
- break; \
- default: \
- break; \
- } \
- }
-
-#define SESS_MGR_STAT_DEC(stat, state, proto) \
- { \
- switch ((state)) \
- { \
- case SESSION_STATE_OPENING: \
- (stat)->proto##_sess_opening--; \
- break; \
- case SESSION_STATE_ACTIVE: \
- (stat)->proto##_sess_active--; \
- break; \
- case SESSION_STATE_CLOSING: \
- (stat)->proto##_sess_closing--; \
- break; \
- case SESSION_STATE_DISCARD: \
- (stat)->proto##_sess_discard--; \
- break; \
- case SESSION_STATE_CLOSED: \
- (stat)->proto##_sess_closed--; \
- break; \
- default: \
- break; \
- } \
- }
-
-#define SESS_MGR_STAT_UPDATE(stat, curr, next, proto) \
- { \
- if (curr != next) \
- { \
- SESS_MGR_STAT_DEC(stat, curr, proto); \
- SESS_MGR_STAT_INC(stat, next, proto); \
- } \
- }
-
-/******************************************************************************
- * snowflake
- ******************************************************************************/
-
-static struct snowflake *snowflake_new(uint64_t seed)
-{
- struct snowflake *sf = (struct snowflake *)calloc(1, sizeof(struct snowflake));
- if (sf == NULL)
- {
- return NULL;
- }
-
- sf->seed = seed & 0xFFFFF;
- sf->sequence = 0;
-
- return sf;
-}
-
-static void snowflake_free(struct snowflake *sf)
-{
- if (sf != NULL)
- {
- free(sf);
- sf = NULL;
- }
-}
-
-/*
- * high -> low
- *
- * +------+------------------+----------------+------------------------+---------------------------+
- * | 1bit | 12bit device_id | 8bit thread_id | 28bit timestamp in sec | 15bit sequence per thread |
- * +------+------------------+----------------+------------------------+---------------------------+
- */
-
-#define MAX_ID_PER_THREAD (32768)
-#define MAX_ID_BASE_TIME (268435456L)
-
-static uint64_t snowflake_generate(struct snowflake *sf, uint64_t now_sec)
+struct session_manager
{
- uint64_t id = 0;
- uint64_t id_per_thread = (sf->sequence++) % MAX_ID_PER_THREAD;
- uint64_t id_base_time = now_sec % MAX_ID_BASE_TIME;
-
- id = (sf->seed << 43) | (id_base_time << 15) | (id_per_thread);
-
- return id;
-}
+ uint16_t thread_num;
+ struct session_manager_config *cfg;
+ struct session_manager_schema *schema;
+ struct session_manager_runtime *runtime[MAX_THREAD_NUM];
+ struct stellar_module_manager *mod_mgr;
+};
/******************************************************************************
- * TCP utils
+ * callback
******************************************************************************/
-static void tcp_clean(struct session_manager_runtime *sess_mgr_rt, struct session *sess)
-{
- struct tcp_reassembly *c2s_ssembler = sess->tcp_halfs[FLOW_TYPE_C2S].assembler;
- struct tcp_reassembly *s2c_ssembler = sess->tcp_halfs[FLOW_TYPE_S2C].assembler;
- struct tcp_segment *seg;
- if (c2s_ssembler)
- {
- while ((seg = tcp_reassembly_expire(c2s_ssembler, UINT64_MAX)))
- {
- session_inc_stat(sess, FLOW_TYPE_C2S, STAT_TCP_SEGMENTS_RELEASED, 1);
- session_inc_stat(sess, FLOW_TYPE_C2S, STAT_TCP_PAYLOADS_RELEASED, seg->len);
- sess_mgr_rt->stat.tcp_segs_freed++;
- tcp_segment_free(seg);
- }
- tcp_reassembly_free(c2s_ssembler);
- }
- if (s2c_ssembler)
- {
- while ((seg = tcp_reassembly_expire(s2c_ssembler, UINT64_MAX)))
- {
- session_inc_stat(sess, FLOW_TYPE_S2C, STAT_TCP_SEGMENTS_RELEASED, 1);
- session_inc_stat(sess, FLOW_TYPE_S2C, STAT_TCP_PAYLOADS_RELEASED, seg->len);
- sess_mgr_rt->stat.tcp_segs_freed++;
- tcp_segment_free(seg);
- }
- tcp_reassembly_free(s2c_ssembler);
- }
-}
-
-static int tcp_init(struct session_manager_runtime *sess_mgr_rt, struct session *sess)
-{
- if (!sess_mgr_rt->cfg.tcp_reassembly.enable)
- {
- return 0;
- }
-
- sess->tcp_halfs[FLOW_TYPE_C2S].assembler = tcp_reassembly_new(sess_mgr_rt->cfg.tcp_reassembly.timeout_ms, sess_mgr_rt->cfg.tcp_reassembly.buffered_segments_max);
- sess->tcp_halfs[FLOW_TYPE_S2C].assembler = tcp_reassembly_new(sess_mgr_rt->cfg.tcp_reassembly.timeout_ms, sess_mgr_rt->cfg.tcp_reassembly.buffered_segments_max);
- if (sess->tcp_halfs[FLOW_TYPE_C2S].assembler == NULL || sess->tcp_halfs[FLOW_TYPE_S2C].assembler == NULL)
- {
- tcp_clean(sess_mgr_rt, sess);
- return -1;
- }
-
- SESSION_LOG_DEBUG("session %lu %s new c2s tcp assembler %p, s2c tcp assembler %p",
- session_get_id(sess), session_get0_readable_addr(sess),
- sess->tcp_halfs[FLOW_TYPE_C2S].assembler,
- sess->tcp_halfs[FLOW_TYPE_S2C].assembler);
-
- return 0;
-}
-
-static void tcp_update(struct session_manager_runtime *sess_mgr_rt, struct session *sess, enum flow_type type, const struct layer_private *tcp_layer)
-{
- struct tcp_segment *seg;
- struct tcphdr *hdr = (struct tcphdr *)tcp_layer->hdr_ptr;
- struct tcp_half *half = &sess->tcp_halfs[type];
- uint8_t flags = tcp_hdr_get_flags(hdr);
- uint16_t len = tcp_layer->pld_len;
-
- if ((flags & TH_SYN) && half->isn == 0)
- {
- half->isn = tcp_hdr_get_seq(hdr);
- }
- half->flags = flags;
- half->history |= flags;
- half->seq = tcp_hdr_get_seq(hdr);
- half->ack = tcp_hdr_get_ack(hdr);
- half->len = tcp_layer->pld_len;
-
- if (!sess_mgr_rt->cfg.tcp_reassembly.enable)
- {
- if (len)
- {
- session_inc_stat(sess, type, STAT_TCP_SEGMENTS_RECEIVED, 1);
- session_inc_stat(sess, type, STAT_TCP_PAYLOADS_RECEIVED, len);
- sess_mgr_rt->stat.tcp_segs_input++;
-
- session_inc_stat(sess, type, STAT_TCP_SEGMENTS_INORDER, 1);
- session_inc_stat(sess, type, STAT_TCP_PAYLOADS_INORDER, len);
- sess_mgr_rt->stat.tcp_segs_inorder++;
-
- half->in_order.data = tcp_layer->pld_ptr;
- half->in_order.len = len;
- half->in_order_ref = 0;
- }
- return;
- }
-
- if (unlikely(flags & TH_SYN))
- {
- // len > 0 is SYN with data (TCP Fast Open)
- tcp_reassembly_set_recv_next(half->assembler, len ? half->seq : half->seq + 1);
- }
-
- seg = tcp_reassembly_expire(half->assembler, sess_mgr_rt->now_ms);
- if (seg)
- {
- session_inc_stat(sess, type, STAT_TCP_SEGMENTS_EXPIRED, 1);
- session_inc_stat(sess, type, STAT_TCP_PAYLOADS_EXPIRED, seg->len);
- sess_mgr_rt->stat.tcp_segs_timeout++;
-
- session_inc_stat(sess, type, STAT_TCP_SEGMENTS_RELEASED, 1);
- session_inc_stat(sess, type, STAT_TCP_PAYLOADS_RELEASED, seg->len);
- sess_mgr_rt->stat.tcp_segs_freed++;
-
- tcp_segment_free(seg);
- }
-
- if (len)
- {
- session_inc_stat(sess, type, STAT_TCP_SEGMENTS_RECEIVED, 1);
- session_inc_stat(sess, type, STAT_TCP_PAYLOADS_RECEIVED, len);
- sess_mgr_rt->stat.tcp_segs_input++;
-
- uint32_t rcv_nxt = tcp_reassembly_get_recv_next(half->assembler);
- // in order
- if (half->seq == rcv_nxt)
- {
- session_inc_stat(sess, type, STAT_TCP_SEGMENTS_INORDER, 1);
- session_inc_stat(sess, type, STAT_TCP_PAYLOADS_INORDER, len);
- sess_mgr_rt->stat.tcp_segs_inorder++;
-
- half->in_order.data = tcp_layer->pld_ptr;
- half->in_order.len = len;
- half->in_order_ref = 0;
- tcp_reassembly_inc_recv_next(half->assembler, len);
- }
- // retransmission
- else if (uint32_before(uint32_add(half->seq, len), rcv_nxt))
- {
- session_inc_stat(sess, type, STAT_TCP_SEGMENTS_RETRANSMIT, 1);
- session_inc_stat(sess, type, STAT_TCP_PAYLOADS_RETRANSMIT, len);
- sess_mgr_rt->stat.tcp_segs_retransmited++;
- }
- else if ((seg = tcp_segment_new(half->seq, tcp_layer->pld_ptr, len)))
- {
- switch (tcp_reassembly_push(half->assembler, seg, sess_mgr_rt->now_ms))
- {
- case -2:
- session_inc_stat(sess, type, STAT_TCP_SEGMENTS_RETRANSMIT, 1);
- session_inc_stat(sess, type, STAT_TCP_PAYLOADS_RETRANSMIT, len);
- sess_mgr_rt->stat.tcp_segs_retransmited++;
- tcp_segment_free(seg);
- break;
- case -1:
- session_inc_stat(sess, type, STAT_TCP_SEGMENTS_NOSPACE, 1);
- session_inc_stat(sess, type, STAT_TCP_PAYLOADS_NOSPACE, len);
- sess_mgr_rt->stat.tcp_segs_omitted_too_many++;
- tcp_segment_free(seg);
- break;
- case 0:
- session_inc_stat(sess, type, STAT_TCP_SEGMENTS_BUFFERED, 1);
- session_inc_stat(sess, type, STAT_TCP_PAYLOADS_BUFFERED, len);
- sess_mgr_rt->stat.tcp_segs_buffered++;
- break;
- case 1:
- session_inc_stat(sess, type, STAT_TCP_SEGMENTS_OVERLAP, 1);
- session_inc_stat(sess, type, STAT_TCP_PAYLOADS_OVERLAP, len);
- sess_mgr_rt->stat.tcp_segs_overlapped++;
-
- session_inc_stat(sess, type, STAT_TCP_SEGMENTS_BUFFERED, 1);
- session_inc_stat(sess, type, STAT_TCP_PAYLOADS_BUFFERED, len);
- sess_mgr_rt->stat.tcp_segs_buffered++;
- break;
- default:
- assert(0);
- break;
- }
- }
- else
- {
- session_inc_stat(sess, type, STAT_TCP_SEGMENTS_NOSPACE, 1);
- session_inc_stat(sess, type, STAT_TCP_PAYLOADS_NOSPACE, len);
- sess_mgr_rt->stat.tcp_segs_omitted_too_many++;
- }
- }
+static void on_session_dispatch(int topic_id, void *msg, on_msg_cb_func *cb, void *cb_args, void *dispatch_args)
+{
+ struct session *sess = (struct session *)msg;
+ struct packet *pkt = (struct packet *)session_get0_current_packet(sess);
+
+ ((on_session_callback *)(void *)cb)(sess, pkt, cb_args);
+}
+
+static void on_tcp_stream_dispatch(int topic_id, void *msg, on_msg_cb_func *cb, void *cb_args, void *dispatch_args)
+{
+ struct tcp_segment *seg = (struct tcp_segment *)msg;
+
+ ((on_tcp_stream_callback *)(void *)cb)(seg->user_data, seg->data, seg->len, cb_args);
+}
+
+static void on_tcp_stream_free(void *msg, void *args)
+{
+ struct tcp_segment *seg = (struct tcp_segment *)msg;
+ struct session *sess = (struct session *)seg->user_data;
+
+ session_free_tcp_segment(sess, seg);
+}
+
+static void on_packet_forward(enum packet_stage stage, struct packet *pkt, void *args)
+{
+ struct session_manager *sess_mgr = (struct session_manager *)args;
+ struct stellar_module_manager *mod_mgr = sess_mgr->mod_mgr;
+ int thread_id = stellar_module_manager_get_thread_id(mod_mgr);
+ struct mq_runtime *mq_rt = stellar_module_manager_get_mq_runtime(mod_mgr);
+ struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id];
+
+ /*
+ * We use the system's real time instead of monotonic time for the following reasons:
+ * -> Session creation/closure times require real time (e.g., for logging session activities).
+ * -> Session ID generation relies on real time (e.g., for reverse calculating session creation time from the session ID).
+ *
+ * Note: Modifying the system time will affect the timing wheel, impacting session expiration, and TCP reassembly expiration.
+ * Suggestion: After modifying the system time, restart the service to ensure consistent timing.
+ */
+ uint64_t now_ms = clock_get_real_time_ms();
+
+ struct tcp_segment *seg = NULL;
+ struct session *sess = session_manager_runtime_lookup_session_by_packet(sess_mgr_rt, pkt);
+ if (sess == NULL)
+ {
+ sess = session_manager_runtime_new_session(sess_mgr_rt, pkt, now_ms);
+ if (sess == NULL)
+ {
+ goto fast_path;
+ }
+ else
+ {
+ session_set_user_data(sess, exdata_runtime_new(sess_mgr->schema->exdata));
+ goto slow_path;
+ }
+ }
+ else
+ {
+ if (session_manager_runtime_update_session(sess_mgr_rt, sess, pkt, now_ms) == -1)
+ {
+ goto fast_path;
+ }
+ else
+ {
+ goto slow_path;
+ }
+ }
+
+slow_path:
+ if (session_get_type(sess) == SESSION_TYPE_TCP)
+ {
+ mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_tcp, sess);
+ while ((seg = session_get_tcp_segment(sess)))
+ {
+ mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_tcp_stream, seg);
+ }
+ }
+ else
+ {
+ mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_udp, sess);
+ }
+
+ packet_set_exdata(pkt, sess_mgr->schema->pkt_exdata_idx, sess);
+
+fast_path:
+ packet_set_exdata(pkt, sess_mgr->schema->pkt_exdata_idx, NULL);
+}
+
+static void on_packet_output(enum packet_stage stage, struct packet *pkt, void *args)
+{
+ struct session_manager *sess_mgr = (struct session_manager *)args;
+ struct session *sess = (struct session *)packet_get_exdata(pkt, sess_mgr->schema->pkt_exdata_idx);
+ if (sess)
+ {
+ enum flow_type type = session_get_flow_type(sess);
+ int is_ctrl = packet_is_ctrl(pkt);
+ uint16_t len = packet_get_raw_len(pkt);
+ switch (packet_get_action(pkt))
+ {
+ case PACKET_ACTION_DROP:
+ session_inc_stat(sess, type, (is_ctrl ? STAT_CONTROL_PACKETS_DROPPED : STAT_RAW_PACKETS_DROPPED), 1);
+ session_inc_stat(sess, type, (is_ctrl ? STAT_CONTROL_BYTES_DROPPED : STAT_RAW_BYTES_DROPPED), len);
+ break;
+ case PACKET_ACTION_FORWARD:
+ session_inc_stat(sess, type, (is_ctrl ? STAT_CONTROL_PACKETS_TRANSMITTED : STAT_RAW_PACKETS_TRANSMITTED), 1);
+ session_inc_stat(sess, type, (is_ctrl ? STAT_CONTROL_BYTES_TRANSMITTED : STAT_RAW_BYTES_TRANSMITTED), len);
+ break;
+ default:
+ assert(0);
+ break;
+ }
+
+ session_set_current_packet(sess, NULL);
+ session_set_flow_type(sess, FLOW_TYPE_NONE);
+ }
+}
+
+static int on_polling(void *args)
+{
+ struct session_manager *sess_mgr = (struct session_manager *)args;
+ struct stellar_module_manager *mod_mgr = sess_mgr->mod_mgr;
+ int thread_id = stellar_module_manager_get_thread_id(mod_mgr);
+ struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id];
+ uint64_t now_ms = clock_get_real_time_ms();
+
+#define MAX_CLEANED_SESS 1024
+ struct session *sess = NULL;
+ struct session *cleaned_sess[MAX_CLEANED_SESS] = {NULL};
+ struct exdata_runtime *exdata_rt = NULL;
+
+ uint64_t used = session_manager_runtime_clean_session(sess_mgr_rt, now_ms, cleaned_sess, MAX_CLEANED_SESS);
+ for (uint64_t j = 0; j < used; j++)
+ {
+ sess = cleaned_sess[j];
+ exdata_rt = (struct exdata_runtime *)session_get_user_data(sess);
+ exdata_runtime_free(exdata_rt);
+ session_manager_runtime_free_session(sess_mgr_rt, sess);
+ }
+
+ // TODO
+ // ouput stat to fs4
+ session_manager_runtime_print_stat(sess_mgr_rt);
+
+ if (used == MAX_CLEANED_SESS)
+ {
+ return 1;
+ }
+ else
+ {
+ return 0;
+ }
}
/******************************************************************************
- * session flow
+ * session manager schema
******************************************************************************/
-static enum flow_type identify_flow_type_by_port(uint16_t src_port, uint16_t dst_port)
-{
- // big port is client
- if (src_port > dst_port)
- {
- return FLOW_TYPE_C2S;
- }
- else if (src_port < dst_port)
- {
- return FLOW_TYPE_S2C;
- }
- else
- {
- // if port is equal, first packet is C2S
- return FLOW_TYPE_C2S;
- }
-}
-
-static enum flow_type identify_flow_type_by_history(const struct session *sess, const struct tuple6 *key)
-{
- if (tuple6_cmp(session_get_tuple6(sess), key) == 0)
- {
- return FLOW_TYPE_C2S;
- }
- else
- {
- return FLOW_TYPE_S2C;
- }
+void session_manager_schema_free(struct session_manager_schema *sess_mgr_schema)
+{
+ if (sess_mgr_schema)
+ {
+ if (sess_mgr_schema->mq)
+ {
+ mq_schema_destroy_topic(sess_mgr_schema->mq, sess_mgr_schema->topic_id_tcp);
+ mq_schema_destroy_topic(sess_mgr_schema->mq, sess_mgr_schema->topic_id_udp);
+ mq_schema_destroy_topic(sess_mgr_schema->mq, sess_mgr_schema->topic_id_ctrl_pkt);
+ mq_schema_destroy_topic(sess_mgr_schema->mq, sess_mgr_schema->topic_id_tcp_stream);
+ }
+ exdata_schema_free(sess_mgr_schema->exdata);
+
+ free(sess_mgr_schema);
+ sess_mgr_schema = NULL;
+ }
+}
+
+struct session_manager_schema *session_manager_schema_new(struct packet_manager *pkt_mgr, struct mq_schema *mq, void *subscribe_args)
+{
+ if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_FORWARD, on_packet_forward, subscribe_args))
+ {
+ SESSION_MANAGER_LOG_ERROR("failed to subscribe PACKET_STAGE_FORWARD");
+ return NULL;
+ }
+ if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_OUTPUT, on_packet_output, subscribe_args))
+ {
+ SESSION_MANAGER_LOG_ERROR("failed to subscribe PACKET_STAGE_OUTPUT");
+ return NULL;
+ }
+
+ struct session_manager_schema *sess_mgr_schema = calloc(1, sizeof(struct session_manager_schema));
+ if (sess_mgr_schema == NULL)
+ {
+ SESSION_MANAGER_LOG_ERROR("failed to allocate memory for session_manager_schema");
+ return NULL;
+ }
+
+ sess_mgr_schema->exdata = exdata_schema_new();
+ if (sess_mgr_schema->exdata == NULL)
+ {
+ SESSION_MANAGER_LOG_ERROR("failed to create exdata_schema");
+ goto error_out;
+ }
+
+ // TODO register polling
+
+ sess_mgr_schema->mq = mq;
+ sess_mgr_schema->pkt_exdata_idx = packet_manager_new_packet_exdata_index(pkt_mgr, "session_manager", NULL, NULL);
+ if (sess_mgr_schema->pkt_exdata_idx == -1)
+ {
+ SESSION_MANAGER_LOG_ERROR("failed to create packet exdata index");
+ goto error_out;
+ }
+
+ sess_mgr_schema->topic_id_tcp = mq_schema_create_topic(sess_mgr_schema->mq, "TCP", &on_session_dispatch, NULL, NULL, NULL);
+ if (sess_mgr_schema->topic_id_tcp == -1)
+ {
+ SESSION_MANAGER_LOG_ERROR("failed to create topic TCP");
+ goto error_out;
+ }
+ sess_mgr_schema->topic_id_udp = mq_schema_create_topic(sess_mgr_schema->mq, "UDP", &on_session_dispatch, NULL, NULL, NULL);
+ if (sess_mgr_schema->topic_id_udp == -1)
+ {
+ SESSION_MANAGER_LOG_ERROR("failed to create topic UDP");
+ goto error_out;
+ }
+ sess_mgr_schema->topic_id_ctrl_pkt = mq_schema_create_topic(sess_mgr_schema->mq, "CTRL_PKT", &on_session_dispatch, NULL, NULL, NULL);
+ if (sess_mgr_schema->topic_id_ctrl_pkt == -1)
+ {
+ SESSION_MANAGER_LOG_ERROR("failed to create topic CTRL_PKT");
+ goto error_out;
+ }
+ sess_mgr_schema->topic_id_tcp_stream = mq_schema_create_topic(sess_mgr_schema->mq, "TCP_STREAM", &on_tcp_stream_dispatch, NULL, &on_tcp_stream_free, NULL);
+ if (sess_mgr_schema->topic_id_tcp_stream == -1)
+ {
+ SESSION_MANAGER_LOG_ERROR("failed to create topic TCP_STREAM");
+ goto error_out;
+ }
+
+ return sess_mgr_schema;
+
+error_out:
+ session_manager_schema_free(sess_mgr_schema);
+ return NULL;
}
/******************************************************************************
- * bypass packet -- table limit / session evicted / duplicated packet
+ * session manager
******************************************************************************/
-static int session_manager_runtime_bypass_packet_on_tcp_table_limit(struct session_manager_runtime *sess_mgr_rt, const struct tuple6 *key)
+void session_manager_free(struct session_manager *sess_mgr)
{
- if (key->ip_proto == IPPROTO_TCP && sess_mgr_rt->stat.tcp_sess_used >= sess_mgr_rt->cfg.tcp_session_max)
- {
- sess_mgr_rt->stat.tcp_pkts_bypass_table_full++;
- return 1;
- }
- return 0;
-}
+ if (sess_mgr)
+ {
+ for (int i = 0; i < sess_mgr->thread_num; i++)
+ {
+ session_manager_runtime_free(sess_mgr->runtime[i]);
+ }
-static int session_manager_runtime_bypass_packet_on_udp_table_limit(struct session_manager_runtime *sess_mgr_rt, const struct tuple6 *key)
-{
- if (key->ip_proto == IPPROTO_UDP && sess_mgr_rt->stat.udp_sess_used >= sess_mgr_rt->cfg.udp_session_max)
- {
- sess_mgr_rt->stat.udp_pkts_bypass_table_full++;
- return 1;
- }
- return 0;
+ session_manager_schema_free(sess_mgr->schema);
+ session_manager_config_free(sess_mgr->cfg);
+ free(sess_mgr);
+ }
}
-static int session_manager_runtime_bypass_packet_on_session_evicted(struct session_manager_runtime *sess_mgr_rt, const struct tuple6 *key)
+struct session_manager *session_manager_new(struct packet_manager *pkt_mgr, struct mq_schema *mq_schema, const char *toml_file)
{
- if (sess_mgr_rt->cfg.evicted_session_bloom_filter.enable && session_filter_lookup(sess_mgr_rt->evicte_sess_filter, key, sess_mgr_rt->now_ms))
- {
- sess_mgr_rt->stat.udp_pkts_bypass_session_evicted++;
- return 1;
- }
-
- return 0;
-}
-
-static int session_manager_runtime_bypass_duplicated_packet(struct session_manager_runtime *sess_mgr_rt, struct session *sess, const struct packet *pkt, const struct tuple6 *key)
-{
- if (sess_mgr_rt->cfg.duplicated_packet_bloom_filter.enable == 0)
- {
- return 0;
- }
-
- enum flow_type type = identify_flow_type_by_history(sess, key);
- if (session_get_stat(sess, type, STAT_RAW_PACKETS_RECEIVED) < 3 || session_has_duplicate_traffic(sess))
- {
- if (packet_filter_lookup(sess_mgr_rt->dup_pkt_filter, pkt, sess_mgr_rt->now_ms))
- {
- session_inc_stat(sess, type, STAT_DUPLICATE_PACKETS_BYPASS, 1);
- session_inc_stat(sess, type, STAT_DUPLICATE_BYTES_BYPASS, packet_get_raw_len(pkt));
- switch (session_get_type(sess))
- {
- case SESSION_TYPE_TCP:
- sess_mgr_rt->stat.tcp_pkts_bypass_duplicated++;
- break;
- case SESSION_TYPE_UDP:
- sess_mgr_rt->stat.udp_pkts_bypass_duplicated++;
- break;
- default:
- assert(0);
- break;
- }
- session_set_duplicate_traffic(sess);
-
- session_set_current_packet(sess, pkt);
- session_set_flow_type(sess, type);
- return 1;
- }
- else
- {
- packet_filter_add(sess_mgr_rt->dup_pkt_filter, pkt, sess_mgr_rt->now_ms);
- return 0;
- }
- }
-
- return 0;
-}
-
-void session_manager_runtime_record_duplicated_packet(struct session_manager_runtime *sess_mgr_rt, const struct packet *pkt)
-{
- if (sess_mgr_rt->cfg.duplicated_packet_bloom_filter.enable)
- {
- packet_filter_add(sess_mgr_rt->dup_pkt_filter, pkt, sess_mgr_rt->now_ms);
- }
-}
-
-/******************************************************************************
- * config -- new / free / print
- ******************************************************************************/
-
-struct session_manager_config *session_manager_config_new(const char *toml_file)
-{
- if (toml_file == NULL)
- {
- return NULL;
- }
-
- struct session_manager_config *sess_mgr_cfg = (struct session_manager_config *)calloc(1, sizeof(struct session_manager_config));
- if (sess_mgr_cfg == NULL)
- {
- return NULL;
- }
-
- int ret = 0;
- ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_session_max", (uint64_t *)&sess_mgr_cfg->tcp_session_max, EVICTE_SESSION_BURST * 2, UINT64_MAX);
- ret += load_and_validate_toml_integer_config(toml_file, "session_manager.udp_session_max", (uint64_t *)&sess_mgr_cfg->udp_session_max, EVICTE_SESSION_BURST * 2, UINT64_MAX);
+ uint16_t thread_num;
+ uint64_t instance_id;
+ uint64_t now_ms = clock_get_real_time_ms();
- ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evict_old_on_tcp_table_limit", (uint64_t *)&sess_mgr_cfg->evict_old_on_tcp_table_limit, 0, 1);
- ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evict_old_on_udp_table_limit", (uint64_t *)&sess_mgr_cfg->evict_old_on_udp_table_limit, 0, 1);
+ if (load_and_validate_toml_integer_config(toml_file, "instance.id", (uint64_t *)&instance_id, 0, 4095))
+ {
+ return NULL;
+ }
+ if (load_and_validate_toml_integer_config(toml_file, "packet.nr_worker_thread", (uint64_t *)&thread_num, 0, MAX_THREAD_NUM))
+ {
+ return NULL;
+ }
- ret += load_and_validate_toml_integer_config(toml_file, "session_manager.expire_period_ms", (uint64_t *)&sess_mgr_cfg->expire_period_ms, 0, 60000);
- ret += load_and_validate_toml_integer_config(toml_file, "session_manager.expire_batch_max", (uint64_t *)&sess_mgr_cfg->expire_batch_max, 1, 1024);
+ struct session_manager *sess_mgr = calloc(1, sizeof(struct session_manager));
+ if (sess_mgr == NULL)
+ {
+ SESSION_MANAGER_LOG_ERROR("failed to allocate memory for session_manager");
+ return NULL;
+ }
- ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.init", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.init, 1, 60000);
- ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.handshake", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.handshake, 1, 60000);
- ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.data", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.data, 1, 15999999000);
- ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.half_closed", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.half_closed, 1, 604800000);
- ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.time_wait", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.time_wait, 1, 60000);
- ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.discard_default", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.discard_default, 1, 15999999000);
- ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.unverified_rst", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.unverified_rst, 1, 60000);
+ sess_mgr->cfg = session_manager_config_new(toml_file);
+ if (sess_mgr->cfg == NULL)
+ {
+ SESSION_MANAGER_LOG_ERROR("failed to create session_manager_config");
+ goto error_out;
+ }
- ret += load_and_validate_toml_integer_config(toml_file, "session_manager.udp_timeout_ms.data", (uint64_t *)&sess_mgr_cfg->udp_timeout_ms.data, 1, 15999999000);
- ret += load_and_validate_toml_integer_config(toml_file, "session_manager.udp_timeout_ms.discard_default", (uint64_t *)&sess_mgr_cfg->udp_timeout_ms.discard_default, 1, 15999999000);
+ sess_mgr->schema = session_manager_schema_new(pkt_mgr, mq_schema, sess_mgr);
+ if (sess_mgr->schema == NULL)
+ {
+ goto error_out;
+ }
- ret += load_and_validate_toml_integer_config(toml_file, "session_manager.duplicated_packet_bloom_filter.enable", (uint64_t *)&sess_mgr_cfg->duplicated_packet_bloom_filter.enable, 0, 1);
- ret += load_and_validate_toml_integer_config(toml_file, "session_manager.duplicated_packet_bloom_filter.capacity", (uint64_t *)&sess_mgr_cfg->duplicated_packet_bloom_filter.capacity, 1, 4294967295);
- ret += load_and_validate_toml_integer_config(toml_file, "session_manager.duplicated_packet_bloom_filter.time_window_ms", (uint64_t *)&sess_mgr_cfg->duplicated_packet_bloom_filter.time_window_ms, 1, 60000);
- ret += load_and_validate_toml_double_config(toml_file, "session_manager.duplicated_packet_bloom_filter.error_rate", (double *)&sess_mgr_cfg->duplicated_packet_bloom_filter.error_rate, 0.0, 1.0);
+ sess_mgr->thread_num = thread_num;
+ for (int i = 0; i < sess_mgr->thread_num; i++)
+ {
+ sess_mgr->cfg->session_id_seed = instance_id << 8 | i;
+ sess_mgr->runtime[i] = session_manager_runtime_new(sess_mgr->cfg, now_ms);
+ if (sess_mgr->runtime[i] == NULL)
+ {
+ SESSION_MANAGER_LOG_ERROR("failed to create session_manager_runtime");
+ goto error_out;
+ }
+ }
- ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evicted_session_bloom_filter.enable", (uint64_t *)&sess_mgr_cfg->evicted_session_bloom_filter.enable, 0, 1);
- ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evicted_session_bloom_filter.capacity", (uint64_t *)&sess_mgr_cfg->evicted_session_bloom_filter.capacity, 1, 4294967295);
- ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evicted_session_bloom_filter.time_window_ms", (uint64_t *)&sess_mgr_cfg->evicted_session_bloom_filter.time_window_ms, 1, 60000);
- ret += load_and_validate_toml_double_config(toml_file, "session_manager.evicted_session_bloom_filter.error_rate", (double *)&sess_mgr_cfg->evicted_session_bloom_filter.error_rate, 0.0, 1.0);
+ return sess_mgr;
- ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_reassembly.enable", (uint64_t *)&sess_mgr_cfg->tcp_reassembly.enable, 0, 1);
- ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_reassembly.timeout_ms", (uint64_t *)&sess_mgr_cfg->tcp_reassembly.timeout_ms, 1, 60000);
- ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_reassembly.buffered_segments_max", (uint64_t *)&sess_mgr_cfg->tcp_reassembly.buffered_segments_max, 1, 512);
-
- if (ret != 0)
- {
- session_manager_config_free(sess_mgr_cfg);
- return NULL;
- }
-
- return sess_mgr_cfg;
+error_out:
+ session_manager_free(sess_mgr);
+ return NULL;
}
-void session_manager_config_free(struct session_manager_config *sess_mgr_cfg)
+int session_manager_new_packet_exdata_index(struct session_manager *sess_mgr, const char *name, exdata_free *func, void *arg)
{
- if (sess_mgr_cfg)
- {
- free(sess_mgr_cfg);
- sess_mgr_cfg = NULL;
- }
+ return exdata_schema_new_index(sess_mgr->schema->exdata, name, func, arg);
}
-void session_manager_config_print(struct session_manager_config *sess_mgr_cfg)
+int session_manager_subscribe_tcp(struct session_manager *sess_mgr, on_session_callback *cb, void *args)
{
- if (sess_mgr_cfg)
- {
- // max session number
- SESSION_LOG_INFO("session_manager.tcp_session_max : %lu", sess_mgr_cfg->tcp_session_max);
- SESSION_LOG_INFO("session_manager.udp_session_max : %lu", sess_mgr_cfg->udp_session_max);
-
- // session overload
- SESSION_LOG_INFO("session_manager.evict_old_on_tcp_table_limit : %d", sess_mgr_cfg->evict_old_on_tcp_table_limit);
- SESSION_LOG_INFO("session_manager.evict_old_on_udp_table_limit : %d", sess_mgr_cfg->evict_old_on_udp_table_limit);
-
- // TCP timeout
- SESSION_LOG_INFO("session_manager.tcp_timeout_ms.init : %lu", sess_mgr_cfg->tcp_timeout_ms.init);
- SESSION_LOG_INFO("session_manager.tcp_timeout_ms.handshake : %lu", sess_mgr_cfg->tcp_timeout_ms.handshake);
- SESSION_LOG_INFO("session_manager.tcp_timeout_ms.data : %lu", sess_mgr_cfg->tcp_timeout_ms.data);
- SESSION_LOG_INFO("session_manager.tcp_timeout_ms.half_closed : %lu", sess_mgr_cfg->tcp_timeout_ms.half_closed);
- SESSION_LOG_INFO("session_manager.tcp_timeout_ms.time_wait : %lu", sess_mgr_cfg->tcp_timeout_ms.time_wait);
- SESSION_LOG_INFO("session_manager.tcp_timeout_ms.discard_default : %lu", sess_mgr_cfg->tcp_timeout_ms.discard_default);
- SESSION_LOG_INFO("session_manager.tcp_timeout_ms.unverified_rst : %lu", sess_mgr_cfg->tcp_timeout_ms.unverified_rst);
-
- // UDP timeout
- SESSION_LOG_INFO("session_manager.udp_timeout_ms.data : %lu", sess_mgr_cfg->udp_timeout_ms.data);
- SESSION_LOG_INFO("session_manager.udp_timeout_ms.discard_default : %lu", sess_mgr_cfg->udp_timeout_ms.discard_default);
-
- // limit
- SESSION_LOG_INFO("session_manager.expire_period_ms : %lu", sess_mgr_cfg->expire_period_ms);
- SESSION_LOG_INFO("session_manager.expire_batch_max : %lu", sess_mgr_cfg->expire_batch_max);
-
- // duplicated packet filter
- SESSION_LOG_INFO("session_manager.duplicated_packet_bloom_filter.enable : %d", sess_mgr_cfg->duplicated_packet_bloom_filter.enable);
- SESSION_LOG_INFO("session_manager.duplicated_packet_bloom_filter.capacity : %lu", sess_mgr_cfg->duplicated_packet_bloom_filter.capacity);
- SESSION_LOG_INFO("session_manager.duplicated_packet_bloom_filter.time_window_ms : %lu", sess_mgr_cfg->duplicated_packet_bloom_filter.time_window_ms);
- SESSION_LOG_INFO("session_manager.duplicated_packet_bloom_filter.error_rate : %f", sess_mgr_cfg->duplicated_packet_bloom_filter.error_rate);
-
- // eviction session filter
- SESSION_LOG_INFO("session_manager.evicted_session_bloom_filter.enable : %d", sess_mgr_cfg->evicted_session_bloom_filter.enable);
- SESSION_LOG_INFO("session_manager.evicted_session_bloom_filter.capacity : %lu", sess_mgr_cfg->evicted_session_bloom_filter.capacity);
- SESSION_LOG_INFO("session_manager.evicted_session_bloom_filter.time_window_ms : %lu", sess_mgr_cfg->evicted_session_bloom_filter.time_window_ms);
- SESSION_LOG_INFO("session_manager.evicted_session_bloom_filter.error_rate : %f", sess_mgr_cfg->evicted_session_bloom_filter.error_rate);
-
- // TCP reassembly
- SESSION_LOG_INFO("session_manager.tcp_reassembly.enable : %d", sess_mgr_cfg->tcp_reassembly.enable);
- SESSION_LOG_INFO("session_manager.tcp_reassembly.timeout_ms : %lu", sess_mgr_cfg->tcp_reassembly.timeout_ms);
- SESSION_LOG_INFO("session_manager.tcp_reassembly.buffered_segments_max : %lu", sess_mgr_cfg->tcp_reassembly.buffered_segments_max);
- }
+ return mq_schema_subscribe(sess_mgr->schema->mq, sess_mgr->schema->topic_id_tcp, (on_msg_cb_func *)(void *)cb, args);
}
-/******************************************************************************
- * runtime -- new / free
- ******************************************************************************/
-
-struct session_manager_runtime *session_manager_runtime_new(const struct session_manager_config *sess_mgr_cfg, uint64_t now_ms)
+int session_manager_subscribe_udp(struct session_manager *sess_mgr, on_session_callback *cb, void *args)
{
- struct session_manager_runtime *sess_mgr_rt = (struct session_manager_runtime *)calloc(1, sizeof(struct session_manager_runtime));
- if (sess_mgr_rt == NULL)
- {
- return NULL;
- }
- memcpy(&sess_mgr_rt->cfg, sess_mgr_cfg, sizeof(struct session_manager_config));
-
- sess_mgr_rt->sess_pool = session_pool_new(sess_mgr_rt->cfg.tcp_session_max + sess_mgr_rt->cfg.udp_session_max);
- sess_mgr_rt->tcp_sess_table = session_table_new();
- sess_mgr_rt->udp_sess_table = session_table_new();
- sess_mgr_rt->sess_timer = session_timer_new(now_ms);
- if (sess_mgr_rt->sess_pool == NULL || sess_mgr_rt->tcp_sess_table == NULL || sess_mgr_rt->udp_sess_table == NULL || sess_mgr_rt->sess_timer == NULL)
- {
- goto error;
- }
- if (sess_mgr_rt->cfg.evicted_session_bloom_filter.enable)
- {
- sess_mgr_rt->evicte_sess_filter = session_filter_new(sess_mgr_rt->cfg.evicted_session_bloom_filter.capacity,
- sess_mgr_rt->cfg.evicted_session_bloom_filter.time_window_ms,
- sess_mgr_rt->cfg.evicted_session_bloom_filter.error_rate, now_ms);
- if (sess_mgr_rt->evicte_sess_filter == NULL)
- {
- goto error;
- }
- }
- if (sess_mgr_rt->cfg.duplicated_packet_bloom_filter.enable)
- {
- sess_mgr_rt->dup_pkt_filter = packet_filter_new(sess_mgr_rt->cfg.duplicated_packet_bloom_filter.capacity,
- sess_mgr_rt->cfg.duplicated_packet_bloom_filter.time_window_ms,
- sess_mgr_rt->cfg.duplicated_packet_bloom_filter.error_rate, now_ms);
- if (sess_mgr_rt->dup_pkt_filter == NULL)
- {
- goto error;
- }
- }
- sess_mgr_rt->sf = snowflake_new(sess_mgr_rt->cfg.session_id_seed);
- if (sess_mgr_rt->sf == NULL)
- {
- goto error;
- }
-
- TAILQ_INIT(&sess_mgr_rt->evicte_list);
- session_transition_init();
- sess_mgr_rt->now_ms = now_ms;
- sess_mgr_rt->last_clean_expired_sess_ts = now_ms;
-
- return sess_mgr_rt;
-
-error:
- session_manager_runtime_free(sess_mgr_rt);
- return NULL;
+ return mq_schema_subscribe(sess_mgr->schema->mq, sess_mgr->schema->topic_id_udp, (on_msg_cb_func *)(void *)cb, args);
}
-void session_manager_runtime_free(struct session_manager_runtime *sess_mgr_rt)
+int session_manager_subscribe_control_packet(struct session_manager *sess_mgr, on_session_callback *cb, void *args)
{
- struct session *sess;
- if (sess_mgr_rt)
- {
- // free all evicted session
- while ((sess = TAILQ_FIRST(&sess_mgr_rt->evicte_list)))
- {
- TAILQ_REMOVE(&sess_mgr_rt->evicte_list, sess, evicte_tqe);
- session_manager_runtime_free_session(sess_mgr_rt, sess);
- }
- // free all udp session
- while (sess_mgr_rt->udp_sess_table && (sess = session_table_find_lru(sess_mgr_rt->udp_sess_table)))
- {
- session_manager_runtime_free_session(sess_mgr_rt, sess);
- }
- // free all tcp session
- while (sess_mgr_rt->tcp_sess_table && (sess = session_table_find_lru(sess_mgr_rt->tcp_sess_table)))
- {
- session_manager_runtime_free_session(sess_mgr_rt, sess);
- }
- if (sess_mgr_rt->cfg.evicted_session_bloom_filter.enable)
- {
- session_filter_free(sess_mgr_rt->evicte_sess_filter);
- }
- if (sess_mgr_rt->cfg.duplicated_packet_bloom_filter.enable)
- {
- packet_filter_free(sess_mgr_rt->dup_pkt_filter);
- }
- snowflake_free(sess_mgr_rt->sf);
- session_timer_free(sess_mgr_rt->sess_timer);
- session_table_free(sess_mgr_rt->udp_sess_table);
- session_table_free(sess_mgr_rt->tcp_sess_table);
- session_pool_free(sess_mgr_rt->sess_pool);
- free(sess_mgr_rt);
- sess_mgr_rt = NULL;
- }
+ return mq_schema_subscribe(sess_mgr->schema->mq, sess_mgr->schema->topic_id_ctrl_pkt, (on_msg_cb_func *)(void *)cb, args);
}
-/******************************************************************************
- * session -- new / free / lookup / updata / expire / evicte / clean
- ******************************************************************************/
-
-static void session_update(struct session_manager_runtime *sess_mgr_rt, struct session *sess, enum session_state next_state, const struct packet *pkt, const struct tuple6 *key, enum flow_type type)
+int session_manager_subscribe_tcp_stream(struct session_manager *sess_mgr, on_tcp_stream_callback *cb, void *args)
{
- if (session_get_current_state(sess) == SESSION_STATE_INIT)
- {
- uint64_t sess_id = snowflake_generate(sess_mgr_rt->sf, sess_mgr_rt->now_ms / 1000);
- session_set_id(sess, sess_id);
- enum packet_direction pkt_dir = packet_get_direction(pkt);
- if (type == FLOW_TYPE_C2S)
- {
- session_set_tuple6(sess, key);
- if (pkt_dir == PACKET_DIRECTION_OUTGOING) // Internal -> External
- {
- session_set_direction(sess, SESSION_DIRECTION_OUTBOUND);
- }
- else
- {
- session_set_direction(sess, SESSION_DIRECTION_INBOUND);
- }
- tuple6_to_str(key, sess->tuple_str, sizeof(sess->tuple_str));
- }
- else
- {
- struct tuple6 out;
- tuple6_reverse(key, &out);
- session_set_tuple6(sess, &out);
- if (pkt_dir == PACKET_DIRECTION_OUTGOING) // Internal -> External
- {
- session_set_direction(sess, SESSION_DIRECTION_INBOUND);
- }
- else
- {
- session_set_direction(sess, SESSION_DIRECTION_OUTBOUND);
- }
- tuple6_to_str(&out, sess->tuple_str, sizeof(sess->tuple_str));
- }
+ assert(sess_mgr);
- session_set_timestamp(sess, SESSION_TIMESTAMP_START, sess_mgr_rt->now_ms);
- switch (key->ip_proto)
- {
- case IPPROTO_TCP:
- session_set_type(sess, SESSION_TYPE_TCP);
- break;
- case IPPROTO_UDP:
- session_set_type(sess, SESSION_TYPE_UDP);
- break;
- default:
- assert(0);
- break;
- }
- }
-
- session_inc_stat(sess, type, STAT_RAW_PACKETS_RECEIVED, 1);
- session_inc_stat(sess, type, STAT_RAW_BYTES_RECEIVED, packet_get_raw_len(pkt));
-
- if (!session_get_first_packet(sess, type))
- {
- session_set_first_packet(sess, type, packet_dup(pkt));
- session_set_route_ctx(sess, type, packet_get_route_ctx(pkt));
- session_set_sids(sess, type, packet_get_sids(pkt));
- }
-
- session_set_current_packet(sess, pkt);
- session_set_flow_type(sess, type);
- session_set_timestamp(sess, SESSION_TIMESTAMP_LAST, sess_mgr_rt->now_ms);
- session_set_current_state(sess, next_state);
-}
-
-static void session_manager_runtime_evicte_session(struct session_manager_runtime *sess_mgr_rt, struct session *sess, int reason)
-{
- if (sess == NULL)
- {
- return;
- }
-
- // when session add to evicted queue, session lifetime is over
- enum session_state curr_state = session_get_current_state(sess);
- enum session_state next_state = session_transition_run(curr_state, reason);
- session_transition_log(sess, curr_state, next_state, reason);
- session_set_current_state(sess, next_state);
- if (!session_get_closing_reason(sess))
- {
- if (reason == PORT_REUSE_EVICT)
- {
- session_set_closing_reason(sess, CLOSING_BY_PORT_REUSE_EVICTED);
- }
- if (reason == LRU_EVICT)
- {
- session_set_closing_reason(sess, CLOSING_BY_LRU_EVICTED);
- }
- }
- session_timer_del(sess_mgr_rt->sess_timer, sess);
- TAILQ_INSERT_TAIL(&sess_mgr_rt->evicte_list, sess, evicte_tqe);
-
- switch (session_get_type(sess))
- {
- case SESSION_TYPE_TCP:
- SESSION_LOG_DEBUG("evicte tcp old session: %lu", session_get_id(sess));
- session_table_del(sess_mgr_rt->tcp_sess_table, sess);
- SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, tcp);
- sess_mgr_rt->stat.tcp_sess_evicted++;
- break;
- case SESSION_TYPE_UDP:
- SESSION_LOG_DEBUG("evicte udp old session: %lu", session_get_id(sess));
- session_table_del(sess_mgr_rt->udp_sess_table, sess);
- if (sess_mgr_rt->cfg.evicted_session_bloom_filter.enable)
- {
- session_filter_add(sess_mgr_rt->evicte_sess_filter, session_get_tuple6(sess), sess_mgr_rt->now_ms);
- }
- SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, udp);
- sess_mgr_rt->stat.udp_sess_evicted++;
- break;
- default:
- assert(0);
- break;
- }
-}
-
-static struct session *session_manager_runtime_lookup_tcp_session(struct session_manager_runtime *sess_mgr_rt, const struct packet *pkt, const struct tuple6 *key)
-{
- struct session *sess = session_table_find_tuple6(sess_mgr_rt->tcp_sess_table, key, 0);
- if (sess == NULL)
- {
- return NULL;
- }
-
- const struct layer_private *tcp_layer = packet_get_innermost_layer(pkt, LAYER_PROTO_TCP);
- const struct tcphdr *hdr = (const struct tcphdr *)tcp_layer->hdr_ptr;
- uint8_t flags = tcp_hdr_get_flags(hdr);
- if ((flags & TH_SYN) == 0)
- {
- return sess;
- }
-
- enum flow_type type = identify_flow_type_by_history(sess, key);
- struct tcp_half *half = &sess->tcp_halfs[type];
- if ((half->isn && half->isn != tcp_hdr_get_seq(hdr)) || // recv SYN with different ISN
- ((half->history & TH_FIN) || (half->history & TH_RST))) // recv SYN after FIN or RST
- {
- // TCP port reuse, evict old session
- session_manager_runtime_evicte_session(sess_mgr_rt, sess, PORT_REUSE_EVICT);
- return NULL;
- }
- else
- {
- // TCP SYN retransmission
- return sess;
- }
-}
-
-static struct session *session_manager_runtime_new_tcp_session(struct session_manager_runtime *sess_mgr_rt, const struct packet *pkt, const struct tuple6 *key)
-{
- const struct layer_private *tcp_layer = packet_get_innermost_layer(pkt, LAYER_PROTO_TCP);
- const struct tcphdr *hdr = (const struct tcphdr *)tcp_layer->hdr_ptr;
- uint8_t flags = tcp_hdr_get_flags(hdr);
- if (!(flags & TH_SYN))
- {
- sess_mgr_rt->stat.tcp_pkts_bypass_session_not_found++;
- return NULL;
- }
-
- // tcp table full evict old session
- if (sess_mgr_rt->cfg.evict_old_on_tcp_table_limit && sess_mgr_rt->stat.tcp_sess_used >= sess_mgr_rt->cfg.tcp_session_max - EVICTE_SESSION_BURST)
- {
- struct session *evic_sess = session_table_find_lru(sess_mgr_rt->tcp_sess_table);
- session_manager_runtime_evicte_session(sess_mgr_rt, evic_sess, LRU_EVICT);
- }
-
- enum flow_type type = (flags & TH_ACK) ? FLOW_TYPE_S2C : FLOW_TYPE_C2S;
- struct session *sess = session_pool_pop(sess_mgr_rt->sess_pool);
- if (sess == NULL)
- {
- assert(0);
- return NULL;
- }
- session_init(sess);
- sess->sess_mgr_rt = sess_mgr_rt;
- sess->sess_mgr_stat = &sess_mgr_rt->stat;
-
- enum session_state next_state = session_transition_run(SESSION_STATE_INIT, TCP_SYN);
- session_update(sess_mgr_rt, sess, next_state, pkt, key, type);
- session_transition_log(sess, SESSION_STATE_INIT, next_state, TCP_SYN);
-
- if (tcp_init(sess_mgr_rt, sess) == -1)
- {
- assert(0);
- session_pool_push(sess_mgr_rt->sess_pool, sess);
- return NULL;
- }
- tcp_update(sess_mgr_rt, sess, type, tcp_layer);
-
- uint64_t timeout = (flags & TH_ACK) ? sess_mgr_rt->cfg.tcp_timeout_ms.handshake : sess_mgr_rt->cfg.tcp_timeout_ms.init;
- session_timer_update(sess_mgr_rt->sess_timer, sess, sess_mgr_rt->now_ms + timeout);
- session_table_add(sess_mgr_rt->tcp_sess_table, sess);
-
- if (sess_mgr_rt->cfg.duplicated_packet_bloom_filter.enable)
- {
- packet_filter_add(sess_mgr_rt->dup_pkt_filter, pkt, sess_mgr_rt->now_ms);
- }
-
- SESS_MGR_STAT_INC(&sess_mgr_rt->stat, next_state, tcp);
- sess_mgr_rt->stat.tcp_sess_used++;
- sess_mgr_rt->stat.history_tcp_sessions++;
-
- return sess;
-}
-
-static struct session *session_manager_runtime_new_udp_session(struct session_manager_runtime *sess_mgr_rt, const struct packet *pkt, const struct tuple6 *key)
-{
- // udp table full evict old session
- if (sess_mgr_rt->cfg.evict_old_on_udp_table_limit && sess_mgr_rt->stat.udp_sess_used >= sess_mgr_rt->cfg.udp_session_max - EVICTE_SESSION_BURST)
- {
- struct session *evic_sess = session_table_find_lru(sess_mgr_rt->udp_sess_table);
- session_manager_runtime_evicte_session(sess_mgr_rt, evic_sess, LRU_EVICT);
- }
-
- struct session *sess = session_pool_pop(sess_mgr_rt->sess_pool);
- if (sess == NULL)
- {
- assert(sess);
- return NULL;
- }
- session_init(sess);
- sess->sess_mgr_rt = sess_mgr_rt;
- sess->sess_mgr_stat = &sess_mgr_rt->stat;
-
- enum flow_type type = identify_flow_type_by_port(ntohs(key->src_port), ntohs(key->dst_port));
- enum session_state next_state = session_transition_run(SESSION_STATE_INIT, UDP_DATA);
- session_update(sess_mgr_rt, sess, next_state, pkt, key, type);
- session_transition_log(sess, SESSION_STATE_INIT, next_state, UDP_DATA);
-
- session_timer_update(sess_mgr_rt->sess_timer, sess, sess_mgr_rt->now_ms + sess_mgr_rt->cfg.udp_timeout_ms.data);
- session_table_add(sess_mgr_rt->udp_sess_table, sess);
-
- SESS_MGR_STAT_INC(&sess_mgr_rt->stat, next_state, udp);
- sess_mgr_rt->stat.udp_sess_used++;
- sess_mgr_rt->stat.history_udp_sessions++;
-
- return sess;
-}
-
-static int session_manager_runtime_update_tcp_session(struct session_manager_runtime *sess_mgr_rt, struct session *sess, const struct packet *pkt, const struct tuple6 *key)
-{
- const struct layer_private *tcp_layer = packet_get_innermost_layer(pkt, LAYER_PROTO_TCP);
- const struct tcphdr *hdr = (const struct tcphdr *)tcp_layer->hdr_ptr;
- enum flow_type type = identify_flow_type_by_history(sess, key);
- uint8_t flags = tcp_hdr_get_flags(hdr);
- int inputs = 0;
- inputs |= (flags & TH_SYN) ? TCP_SYN : NONE;
- inputs |= (flags & TH_FIN) ? TCP_FIN : NONE;
- inputs |= (flags & TH_RST) ? TCP_RST : NONE;
- inputs |= tcp_layer->pld_len ? TCP_DATA : NONE;
-
- // update state
- enum session_state curr_state = session_get_current_state(sess);
- enum session_state next_state = session_transition_run(curr_state, inputs);
-
- // update session
- session_update(sess_mgr_rt, sess, next_state, pkt, key, type);
- session_transition_log(sess, curr_state, next_state, inputs);
-
- // update tcp
- tcp_update(sess_mgr_rt, sess, type, tcp_layer);
-
- // set closing reason
- if (next_state == SESSION_STATE_CLOSING && !session_get_closing_reason(sess))
- {
- if (flags & TH_FIN)
- {
- session_set_closing_reason(sess, (type == FLOW_TYPE_C2S ? CLOSING_BY_CLIENT_FIN : CLOSING_BY_SERVER_FIN));
- }
- if (flags & TH_RST)
- {
- session_set_closing_reason(sess, (type == FLOW_TYPE_C2S ? CLOSING_BY_CLIENT_RST : CLOSING_BY_SERVER_RST));
- }
- }
-
- // update timeout
- struct tcp_half *curr = &sess->tcp_halfs[type];
- struct tcp_half *peer = &sess->tcp_halfs[(type == FLOW_TYPE_C2S ? FLOW_TYPE_S2C : FLOW_TYPE_C2S)];
- uint64_t timeout = 0;
- switch (next_state)
- {
- case SESSION_STATE_OPENING:
- if (flags & TH_SYN)
- {
- timeout = (flags & TH_ACK) ? sess_mgr_rt->cfg.tcp_timeout_ms.handshake : sess_mgr_rt->cfg.tcp_timeout_ms.init;
- }
- else
- {
- timeout = sess_mgr_rt->cfg.tcp_timeout_ms.data;
- }
- break;
- case SESSION_STATE_ACTIVE:
- timeout = sess_mgr_rt->cfg.tcp_timeout_ms.data;
- break;
- case SESSION_STATE_CLOSING:
- if (flags & TH_FIN)
- {
- timeout = (peer->history & TH_FIN) ? sess_mgr_rt->cfg.tcp_timeout_ms.time_wait : sess_mgr_rt->cfg.tcp_timeout_ms.half_closed;
- }
- else if (flags & TH_RST)
- {
- // if fin is received, the expected sequence number should be increased by 1
- uint32_t expected = (peer->history & TH_FIN) ? peer->ack + 1 : peer->ack;
- timeout = (expected == curr->seq) ? sess_mgr_rt->cfg.tcp_timeout_ms.time_wait : sess_mgr_rt->cfg.tcp_timeout_ms.unverified_rst;
- }
- else
- {
- timeout = sess_mgr_rt->cfg.tcp_timeout_ms.data;
- }
- break;
- case SESSION_STATE_DISCARD:
- timeout = sess_mgr_rt->cfg.tcp_timeout_ms.discard_default;
- break;
- default:
- assert(0);
- break;
- }
- session_timer_update(sess_mgr_rt->sess_timer, sess, sess_mgr_rt->now_ms + timeout);
-
- SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, tcp);
-
- return 0;
-}
-
-static int session_manager_runtime_update_udp_session(struct session_manager_runtime *sess_mgr_rt, struct session *sess, const struct packet *pkt, const struct tuple6 *key)
-{
- enum flow_type type = identify_flow_type_by_history(sess, key);
- enum session_state curr_state = session_get_current_state(sess);
- enum session_state next_state = session_transition_run(curr_state, UDP_DATA);
- session_update(sess_mgr_rt, sess, next_state, pkt, key, type);
- session_transition_log(sess, curr_state, next_state, UDP_DATA);
-
- if (session_get_current_state(sess) == SESSION_STATE_DISCARD)
- {
- session_timer_update(sess_mgr_rt->sess_timer, sess, sess_mgr_rt->now_ms + sess_mgr_rt->cfg.udp_timeout_ms.discard_default);
- }
- else
- {
- session_timer_update(sess_mgr_rt->sess_timer, sess, sess_mgr_rt->now_ms + sess_mgr_rt->cfg.udp_timeout_ms.data);
- }
-
- SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, udp);
-
- return 0;
-}
-
-struct session *session_manager_runtime_new_session(struct session_manager_runtime *sess_mgr_rt, const struct packet *pkt, uint64_t now_ms)
-{
- sess_mgr_rt->now_ms = now_ms;
-
- struct tuple6 key;
- if (packet_get_innermost_tuple6(pkt, &key))
- {
- return NULL;
- }
- switch (key.ip_proto)
- {
- case IPPROTO_TCP:
- if (session_manager_runtime_bypass_packet_on_tcp_table_limit(sess_mgr_rt, &key))
- {
- return NULL;
- }
- return session_manager_runtime_new_tcp_session(sess_mgr_rt, pkt, &key);
- case IPPROTO_UDP:
- if (session_manager_runtime_bypass_packet_on_session_evicted(sess_mgr_rt, &key))
- {
- return NULL;
- }
- if (session_manager_runtime_bypass_packet_on_udp_table_limit(sess_mgr_rt, &key))
- {
- return NULL;
- }
- return session_manager_runtime_new_udp_session(sess_mgr_rt, pkt, &key);
- default:
- return NULL;
- }
-}
-
-void session_manager_runtime_free_session(struct session_manager_runtime *sess_mgr_rt, struct session *sess)
-{
- if (sess)
- {
- SESSION_LOG_DEBUG("session %lu closed (%s)", session_get_id(sess), closing_reason_to_str(session_get_closing_reason(sess)));
-
- session_timer_del(sess_mgr_rt->sess_timer, sess);
- switch (session_get_type(sess))
- {
- case SESSION_TYPE_TCP:
- tcp_clean(sess_mgr_rt, sess);
- if (session_table_find_sessid(sess_mgr_rt->tcp_sess_table, session_get_id(sess), 0) == sess)
- {
- session_table_del(sess_mgr_rt->tcp_sess_table, sess);
- }
- SESS_MGR_STAT_DEC(&sess_mgr_rt->stat, session_get_current_state(sess), tcp);
- sess_mgr_rt->stat.tcp_sess_used--;
- break;
- case SESSION_TYPE_UDP:
- if (session_table_find_sessid(sess_mgr_rt->udp_sess_table, session_get_id(sess), 0) == sess)
- {
- session_table_del(sess_mgr_rt->udp_sess_table, sess);
- }
- SESS_MGR_STAT_DEC(&sess_mgr_rt->stat, session_get_current_state(sess), udp);
- sess_mgr_rt->stat.udp_sess_used--;
- break;
- default:
- assert(0);
- break;
- }
-
- packet_free((struct packet *)session_get_first_packet(sess, FLOW_TYPE_C2S));
- packet_free((struct packet *)session_get_first_packet(sess, FLOW_TYPE_S2C));
- session_set_first_packet(sess, FLOW_TYPE_C2S, NULL);
- session_set_first_packet(sess, FLOW_TYPE_S2C, NULL);
- session_clear_route_ctx(sess, FLOW_TYPE_C2S);
- session_clear_route_ctx(sess, FLOW_TYPE_S2C);
- session_clear_sids(sess, FLOW_TYPE_C2S);
- session_clear_sids(sess, FLOW_TYPE_S2C);
- session_set_current_state(sess, SESSION_STATE_INIT);
- session_set_current_packet(sess, NULL);
- session_set_flow_type(sess, FLOW_TYPE_NONE);
- session_pool_push(sess_mgr_rt->sess_pool, sess);
- sess = NULL;
- }
-}
-
-struct session *session_manager_runtime_lookup_session_by_packet(struct session_manager_runtime *sess_mgr_rt, const struct packet *pkt)
-{
- struct tuple6 key;
- if (packet_get_innermost_tuple6(pkt, &key))
- {
- return NULL;
- }
- switch (key.ip_proto)
- {
- case IPPROTO_UDP:
- return session_table_find_tuple6(sess_mgr_rt->udp_sess_table, &key, 0);
- case IPPROTO_TCP:
- return session_manager_runtime_lookup_tcp_session(sess_mgr_rt, pkt, &key);
- default:
- return NULL;
- }
-}
-
-struct session *session_manager_runtime_lookup_session_by_id(struct session_manager_runtime *sess_mgr_rt, uint64_t sess_id)
-{
- struct session *sess = NULL;
- sess = session_table_find_sessid(sess_mgr_rt->tcp_sess_table, sess_id, 1);
- if (sess)
- {
- return sess;
- }
-
- sess = session_table_find_sessid(sess_mgr_rt->udp_sess_table, sess_id, 1);
- if (sess)
- {
- return sess;
- }
-
- return NULL;
-}
-
-int session_manager_runtime_update_session(struct session_manager_runtime *sess_mgr_rt, struct session *sess, const struct packet *pkt, uint64_t now_ms)
-{
- sess_mgr_rt->now_ms = now_ms;
-
- struct tuple6 key;
- if (packet_get_innermost_tuple6(pkt, &key))
- {
- return -1;
- }
- if (session_manager_runtime_bypass_duplicated_packet(sess_mgr_rt, sess, pkt, &key))
- {
- return -1;
- }
- switch (session_get_type(sess))
- {
- case SESSION_TYPE_TCP:
- return session_manager_runtime_update_tcp_session(sess_mgr_rt, sess, pkt, &key);
- case SESSION_TYPE_UDP:
- return session_manager_runtime_update_udp_session(sess_mgr_rt, sess, pkt, &key);
- default:
- return -1;
- }
-}
-
-struct session *session_manager_runtime_get_expired_session(struct session_manager_runtime *sess_mgr_rt, uint64_t now_ms)
-{
- sess_mgr_rt->now_ms = now_ms;
-
- struct session *sess = session_timer_expire(sess_mgr_rt->sess_timer, now_ms);
- if (sess)
- {
- enum session_state curr_state = session_get_current_state(sess);
- enum session_state next_state = session_transition_run(curr_state, TIMEOUT);
- session_transition_log(sess, curr_state, next_state, TIMEOUT);
- session_set_current_state(sess, next_state);
-
- switch (session_get_type(sess))
- {
- case SESSION_TYPE_TCP:
- SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, tcp);
- break;
- case SESSION_TYPE_UDP:
- SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, udp);
- break;
- default:
- assert(0);
- break;
- }
-
- // next state is closed, need to free session
- if (next_state == SESSION_STATE_CLOSED)
- {
- if (!session_get_closing_reason(sess))
- {
- session_set_closing_reason(sess, CLOSING_BY_TIMEOUT);
- }
- return sess;
- }
- // next state is closing, only update timeout
- else
- {
- switch (session_get_type(sess))
- {
- case SESSION_TYPE_TCP:
- session_timer_update(sess_mgr_rt->sess_timer, sess, now_ms + sess_mgr_rt->cfg.tcp_timeout_ms.data);
- break;
- case SESSION_TYPE_UDP:
- session_timer_update(sess_mgr_rt->sess_timer, sess, now_ms + sess_mgr_rt->cfg.udp_timeout_ms.data);
- break;
- default:
- assert(0);
- break;
- }
- return NULL;
- }
- }
-
- return NULL;
-}
-
-struct session *session_manager_runtime_get_evicted_session(struct session_manager_runtime *sess_mgr_rt)
-{
- struct session *sess = TAILQ_FIRST(&sess_mgr_rt->evicte_list);
- if (sess)
- {
- TAILQ_REMOVE(&sess_mgr_rt->evicte_list, sess, evicte_tqe);
- }
- return sess;
-}
-
-uint64_t session_manager_runtime_clean_session(struct session_manager_runtime *sess_mgr_rt, uint64_t now_ms, struct session *cleaned_sess[], uint64_t array_size)
-{
- sess_mgr_rt->now_ms = now_ms;
- struct session *sess = NULL;
- uint64_t cleaned_sess_num = 0;
- uint64_t expired_sess_num = 0;
-
- uint8_t expired_sess_canbe_clean = 0;
- if (now_ms - sess_mgr_rt->last_clean_expired_sess_ts >= sess_mgr_rt->cfg.expire_period_ms)
- {
- expired_sess_canbe_clean = 1;
- }
-
- for (uint64_t i = 0; i < array_size; i++)
- {
- // frist clean evicted session
- sess = session_manager_runtime_get_evicted_session(sess_mgr_rt);
- if (sess)
- {
- cleaned_sess[cleaned_sess_num++] = sess;
- }
- // then clean expired session
- else
- {
- if (expired_sess_canbe_clean && expired_sess_num < sess_mgr_rt->cfg.expire_batch_max)
- {
- sess_mgr_rt->last_clean_expired_sess_ts = now_ms;
- sess = session_manager_runtime_get_expired_session(sess_mgr_rt, now_ms);
- if (sess)
- {
- cleaned_sess[cleaned_sess_num++] = sess;
- expired_sess_num++;
- }
- else
- {
- break;
- }
- }
- else
- {
- break;
- }
- }
- }
-
- return cleaned_sess_num;
+ return mq_schema_subscribe(sess_mgr->schema->mq, sess_mgr->schema->topic_id_tcp_stream, (on_msg_cb_func *)(void *)cb, args);
}
/******************************************************************************
- * stat -- get / print
+ * session manager module
******************************************************************************/
-struct session_manager_stat *session_manager_runtime_get_stat(struct session_manager_runtime *sess_mgr_rt)
+struct stellar_module *session_manager_module_on_init(struct stellar_module_manager *mod_mgr)
{
- return &sess_mgr_rt->stat;
-}
+ struct stellar_module *pkt_mgr_mod = stellar_module_manager_get_module(mod_mgr, "packet_manager_module");
+ struct packet_manager *pkt_mgr = stellar_module_get_ctx(pkt_mgr_mod);
+ struct mq_schema *mq_schema = stellar_module_manager_get_mq_schema(mod_mgr);
+ const char *toml_file = stellar_module_manager_get_toml_path(mod_mgr);
-void session_manager_runtime_print_stat(struct session_manager_runtime *sess_mgr_rt)
-{
- struct session_manager_stat *stat = &sess_mgr_rt->stat;
+ struct session_manager *sess_mgr = session_manager_new(pkt_mgr, mq_schema, toml_file);
+ if (sess_mgr == NULL)
+ {
+ return NULL;
+ }
- // TCP session
- SESSION_LOG_INFO("TCP session: history=%lu, used=%lu, opening=%lu, active=%lu, closing=%lu, discard=%lu, closed=%lu",
- stat->history_tcp_sessions, stat->tcp_sess_used, stat->tcp_sess_opening, stat->tcp_sess_active,
- stat->tcp_sess_closing, stat->tcp_sess_discard, stat->tcp_sess_closed);
- // UDP session
- SESSION_LOG_INFO("UDP session: history=%lu, used=%lu, opening=%lu, active=%lu, closing=%lu, discard=%lu, closed=%lu",
- stat->history_udp_sessions, stat->udp_sess_used, stat->udp_sess_opening, stat->udp_sess_active,
- stat->udp_sess_closing, stat->udp_sess_discard, stat->udp_sess_closed);
- // evicted session
- SESSION_LOG_INFO("evicted session: TCP=%lu, UDP=%lu", stat->tcp_sess_evicted, stat->udp_sess_evicted);
- // Bypassed packet
- SESSION_LOG_INFO("bypassed TCP packet: table_full=%lu, session_not_found=%lu, duplicated=%lu",
- stat->tcp_pkts_bypass_table_full, stat->tcp_pkts_bypass_session_not_found, stat->tcp_pkts_bypass_duplicated);
- SESSION_LOG_INFO("bypassed UDP packet: table_full=%lu, session_evicted=%lu, duplicated=%lu",
- stat->udp_pkts_bypass_table_full, stat->udp_pkts_bypass_session_evicted, stat->udp_pkts_bypass_duplicated);
- // TCP segment
- SESSION_LOG_INFO("TCP segment: input=%lu, consumed=%lu, timeout=%lu, retransmited=%lu, overlapped=%lu, omitted_too_many=%lu, inorder=%lu, reordered=%lu, buffered=%lu, freed=%lu",
- stat->tcp_segs_input, stat->tcp_segs_consumed, stat->tcp_segs_timeout, stat->tcp_segs_retransmited,
- stat->tcp_segs_overlapped, stat->tcp_segs_omitted_too_many, stat->tcp_segs_inorder, stat->tcp_segs_reordered,
- stat->tcp_segs_buffered, stat->tcp_segs_freed);
-}
+ struct stellar_module *sess_mgr_mod = stellar_module_new("session_manager_module", sess_mgr);
+ if (sess_mgr_mod == NULL)
+ {
+ SESSION_MANAGER_LOG_ERROR("failed to create session_manager_module");
+ session_manager_free(sess_mgr);
+ return NULL;
+ }
+ stellar_module_set_ctx(sess_mgr_mod, sess_mgr);
-/******************************************************************************
- * scan
- ******************************************************************************/
-
-static inline uint8_t ipv4_in_range(const struct in_addr *addr, const struct in_addr *start, const struct in_addr *end)
-{
- return (memcmp(addr, start, sizeof(struct in_addr)) >= 0 && memcmp(addr, end, sizeof(struct in_addr)) <= 0);
+ SESSION_MANAGER_LOG_INFO("session_manager_module initialized");
+ return sess_mgr_mod;
}
-static inline uint8_t ipv6_in_range(const struct in6_addr *addr, const struct in6_addr *start, const struct in6_addr *end)
-{
- return (memcmp(addr, start, sizeof(struct in6_addr)) >= 0 && memcmp(addr, end, sizeof(struct in6_addr)) <= 0);
-}
-
-uint64_t session_manager_runtime_scan(const struct session_manager_runtime *sess_mgr_rt, const struct session_scan_opts *opts, uint64_t mached_sess_ids[], uint64_t array_size)
-{
- uint64_t capacity = 0;
- uint64_t max_loop = 0;
- uint64_t mached_sess_num = 0;
- const struct session *sess = NULL;
- const struct tuple6 *tuple = NULL;
-
- if (sess_mgr_rt == NULL || opts == NULL || mached_sess_ids == NULL || array_size == 0)
- {
- return mached_sess_num;
- }
- if (opts->count == 0)
- {
- return mached_sess_num;
- }
- capacity = session_pool_capacity_size(sess_mgr_rt->sess_pool);
- if (opts->cursor >= capacity)
- {
- return mached_sess_num;
- }
-
- max_loop = MIN(capacity, opts->cursor + opts->count);
- for (uint64_t i = opts->cursor; i < max_loop; i++)
- {
- sess = session_pool_get0(sess_mgr_rt->sess_pool, i);
- tuple = session_get_tuple6(sess);
- if (session_get_current_state(sess) == SESSION_STATE_INIT)
- {
- continue;
- }
-
- if ((opts->flags & SESSION_SCAN_TYPE) && opts->type != session_get_type(sess))
- {
- continue;
- }
- if ((opts->flags & SESSION_SCAN_STATE) && opts->state != session_get_current_state(sess))
- {
- continue;
- }
- if ((opts->flags & SESSION_SCAN_CREATE_TIME) &&
- (session_get_timestamp(sess, SESSION_TIMESTAMP_START) < opts->create_time_ms[0] ||
- session_get_timestamp(sess, SESSION_TIMESTAMP_START) > opts->create_time_ms[1]))
- {
- continue;
- }
- if ((opts->flags & SESSION_SCAN_LAST_PKT_TIME) &&
- (session_get_timestamp(sess, SESSION_TIMESTAMP_LAST) < opts->last_pkt_time_ms[0] ||
- session_get_timestamp(sess, SESSION_TIMESTAMP_LAST) > opts->last_pkt_time_ms[1]))
- {
- continue;
- }
- if ((opts->flags & SESSION_SCAN_SPORT) && opts->src_port != tuple->src_port)
- {
- continue;
- }
- if ((opts->flags & SESSION_SCAN_DPORT) && opts->dst_port != tuple->dst_port)
- {
- continue;
- }
- if (opts->flags & SESSION_SCAN_SIP)
- {
- if (opts->addr_family != tuple->addr_family)
- {
- continue;
- }
- if ((opts->addr_family == AF_INET) && !ipv4_in_range(&tuple->src_addr.v4, &opts->src_addr[0].v4, &opts->src_addr[1].v4))
- {
- continue;
- }
- if ((opts->addr_family == AF_INET6) && !ipv6_in_range(&tuple->src_addr.v6, &opts->src_addr[0].v6, &opts->src_addr[1].v6))
- {
- continue;
- }
- }
- if (opts->flags & SESSION_SCAN_DIP)
- {
- if (opts->addr_family != tuple->addr_family)
- {
- continue;
- }
- if ((opts->addr_family == AF_INET) && !ipv4_in_range(&tuple->dst_addr.v4, &opts->dst_addr[0].v4, &opts->dst_addr[1].v4))
- {
- continue;
- }
- if ((opts->addr_family == AF_INET6) && !ipv6_in_range(&tuple->dst_addr.v6, &opts->dst_addr[0].v6, &opts->dst_addr[1].v6))
- {
- continue;
- }
- }
-
- mached_sess_ids[mached_sess_num++] = session_get_id(sess);
- if (mached_sess_num >= array_size)
- {
- break;
- }
- }
-
- SESSION_LOG_DEBUG("session scan: cursor=%lu, count=%lu, mached_sess_num=%lu", opts->cursor, opts->count, mached_sess_num);
- return mached_sess_num;
-}
-
-/******************************************************************************
- * other
- ******************************************************************************/
-
-void session_set_discard(struct session *sess)
+void session_manager_module_on_exit(struct stellar_module_manager *mod_mgr, struct stellar_module *mod)
{
- struct session_manager_runtime *sess_mgr_rt = sess->sess_mgr_rt;
- enum session_type type = session_get_type(sess);
- enum session_state curr_state = session_get_current_state(sess);
- enum session_state next_state = session_transition_run(curr_state, USER_CLOSE);
- session_transition_log(sess, curr_state, next_state, USER_CLOSE);
- session_set_current_state(sess, next_state);
+ if (mod)
+ {
+ struct session_manager *sess_mgr = stellar_module_get_ctx(mod);
- switch (type)
- {
- case SESSION_TYPE_TCP:
- session_timer_update(sess_mgr_rt->sess_timer, sess, sess_mgr_rt->now_ms + sess_mgr_rt->cfg.tcp_timeout_ms.discard_default);
- SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, tcp);
- break;
- case SESSION_TYPE_UDP:
- session_timer_update(sess_mgr_rt->sess_timer, sess, sess_mgr_rt->now_ms + sess_mgr_rt->cfg.udp_timeout_ms.discard_default);
- SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, udp);
- break;
- default:
- assert(0);
- break;
- }
+ session_manager_free(sess_mgr);
+ stellar_module_free(mod);
+ SESSION_MANAGER_LOG_ERROR("session_manager_module exited");
+ }
} \ No newline at end of file
diff --git a/infra/session_manager/session_manager_runtime.c b/infra/session_manager/session_manager_runtime.c
new file mode 100644
index 0000000..f7786a1
--- /dev/null
+++ b/infra/session_manager/session_manager_runtime.c
@@ -0,0 +1,1462 @@
+#include <time.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <errno.h>
+
+#include "utils.h"
+#include "packet_helper.h"
+#include "packet_filter.h"
+#include "session_internal.h"
+#include "session_pool.h"
+#include "session_table.h"
+#include "session_timer.h"
+#include "session_filter.h"
+#include "session_transition.h"
+#include "session_manager_runtime.h"
+
+#define SESSION_MANAGER_RUNTIME_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "session manager runtime", format, ##__VA_ARGS__)
+#define SESSION_MANAGER_RUNTIME_LOG_DEBUG(format, ...) STELLAR_LOG_DEBUG(__thread_local_logger, "session manager runtime", format, ##__VA_ARGS__)
+#define SESSION_MANAGER_RUNTIME_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "session manager runtime", format, ##__VA_ARGS__)
+
+struct snowflake
+{
+ uint64_t seed;
+ uint64_t sequence;
+};
+
+struct session_manager_runtime
+{
+ struct session_list evicte_list;
+ struct session_pool *sess_pool;
+ struct session_timer *sess_timer;
+ struct session_table *tcp_sess_table;
+ struct session_table *udp_sess_table;
+
+ struct packet_filter *dup_pkt_filter;
+ struct session_filter *evicte_sess_filter;
+
+ struct session_manager_stat stat;
+ struct session_manager_config cfg;
+
+ /*
+ * only used for session_set_discard() or session_manager_runtime_record_duplicated_packet(),
+ * because the function is called by plugin and has no time input.
+ */
+ uint64_t now_ms;
+ uint64_t last_clean_expired_sess_ts;
+ struct snowflake *sf;
+};
+
+#define EVICTE_SESSION_BURST (RX_BURST_MAX)
+
+/******************************************************************************
+ * session manager stat macro
+ ******************************************************************************/
+
+#define SESS_MGR_STAT_INC(stat, state, proto) \
+ { \
+ switch ((state)) \
+ { \
+ case SESSION_STATE_OPENING: \
+ (stat)->proto##_sess_opening++; \
+ break; \
+ case SESSION_STATE_ACTIVE: \
+ (stat)->proto##_sess_active++; \
+ break; \
+ case SESSION_STATE_CLOSING: \
+ (stat)->proto##_sess_closing++; \
+ break; \
+ case SESSION_STATE_DISCARD: \
+ (stat)->proto##_sess_discard++; \
+ break; \
+ case SESSION_STATE_CLOSED: \
+ (stat)->proto##_sess_closed++; \
+ break; \
+ default: \
+ break; \
+ } \
+ }
+
+#define SESS_MGR_STAT_DEC(stat, state, proto) \
+ { \
+ switch ((state)) \
+ { \
+ case SESSION_STATE_OPENING: \
+ (stat)->proto##_sess_opening--; \
+ break; \
+ case SESSION_STATE_ACTIVE: \
+ (stat)->proto##_sess_active--; \
+ break; \
+ case SESSION_STATE_CLOSING: \
+ (stat)->proto##_sess_closing--; \
+ break; \
+ case SESSION_STATE_DISCARD: \
+ (stat)->proto##_sess_discard--; \
+ break; \
+ case SESSION_STATE_CLOSED: \
+ (stat)->proto##_sess_closed--; \
+ break; \
+ default: \
+ break; \
+ } \
+ }
+
+#define SESS_MGR_STAT_UPDATE(stat, curr, next, proto) \
+ { \
+ if (curr != next) \
+ { \
+ SESS_MGR_STAT_DEC(stat, curr, proto); \
+ SESS_MGR_STAT_INC(stat, next, proto); \
+ } \
+ }
+
+/******************************************************************************
+ * snowflake
+ ******************************************************************************/
+
+static struct snowflake *snowflake_new(uint64_t seed)
+{
+ struct snowflake *sf = (struct snowflake *)calloc(1, sizeof(struct snowflake));
+ if (sf == NULL)
+ {
+ return NULL;
+ }
+
+ sf->seed = seed & 0xFFFFF;
+ sf->sequence = 0;
+
+ return sf;
+}
+
+static void snowflake_free(struct snowflake *sf)
+{
+ if (sf != NULL)
+ {
+ free(sf);
+ sf = NULL;
+ }
+}
+
+/*
+ * high -> low
+ *
+ * +------+------------------+----------------+------------------------+---------------------------+
+ * | 1bit | 12bit device_id | 8bit thread_id | 28bit timestamp in sec | 15bit sequence per thread |
+ * +------+------------------+----------------+------------------------+---------------------------+
+ */
+
+#define MAX_ID_PER_THREAD (32768)
+#define MAX_ID_BASE_TIME (268435456L)
+
+static uint64_t snowflake_generate(struct snowflake *sf, uint64_t now_sec)
+{
+ uint64_t id = 0;
+ uint64_t id_per_thread = (sf->sequence++) % MAX_ID_PER_THREAD;
+ uint64_t id_base_time = now_sec % MAX_ID_BASE_TIME;
+
+ id = (sf->seed << 43) | (id_base_time << 15) | (id_per_thread);
+
+ return id;
+}
+
+/******************************************************************************
+ * TCP utils
+ ******************************************************************************/
+
+static void tcp_clean(struct session_manager_runtime *sess_mgr_rt, struct session *sess)
+{
+ struct tcp_reassembly *c2s_ssembler = sess->tcp_halfs[FLOW_TYPE_C2S].assembler;
+ struct tcp_reassembly *s2c_ssembler = sess->tcp_halfs[FLOW_TYPE_S2C].assembler;
+ struct tcp_segment *seg;
+ if (c2s_ssembler)
+ {
+ while ((seg = tcp_reassembly_expire(c2s_ssembler, UINT64_MAX)))
+ {
+ session_inc_stat(sess, FLOW_TYPE_C2S, STAT_TCP_SEGMENTS_RELEASED, 1);
+ session_inc_stat(sess, FLOW_TYPE_C2S, STAT_TCP_PAYLOADS_RELEASED, seg->len);
+ sess_mgr_rt->stat.tcp_segs_freed++;
+ tcp_segment_free(seg);
+ }
+ tcp_reassembly_free(c2s_ssembler);
+ }
+ if (s2c_ssembler)
+ {
+ while ((seg = tcp_reassembly_expire(s2c_ssembler, UINT64_MAX)))
+ {
+ session_inc_stat(sess, FLOW_TYPE_S2C, STAT_TCP_SEGMENTS_RELEASED, 1);
+ session_inc_stat(sess, FLOW_TYPE_S2C, STAT_TCP_PAYLOADS_RELEASED, seg->len);
+ sess_mgr_rt->stat.tcp_segs_freed++;
+ tcp_segment_free(seg);
+ }
+ tcp_reassembly_free(s2c_ssembler);
+ }
+}
+
+static int tcp_init(struct session_manager_runtime *sess_mgr_rt, struct session *sess)
+{
+ if (!sess_mgr_rt->cfg.tcp_reassembly.enable)
+ {
+ return 0;
+ }
+
+ sess->tcp_halfs[FLOW_TYPE_C2S].assembler = tcp_reassembly_new(sess_mgr_rt->cfg.tcp_reassembly.timeout_ms, sess_mgr_rt->cfg.tcp_reassembly.buffered_segments_max);
+ sess->tcp_halfs[FLOW_TYPE_S2C].assembler = tcp_reassembly_new(sess_mgr_rt->cfg.tcp_reassembly.timeout_ms, sess_mgr_rt->cfg.tcp_reassembly.buffered_segments_max);
+ if (sess->tcp_halfs[FLOW_TYPE_C2S].assembler == NULL || sess->tcp_halfs[FLOW_TYPE_S2C].assembler == NULL)
+ {
+ tcp_clean(sess_mgr_rt, sess);
+ return -1;
+ }
+
+ SESSION_MANAGER_RUNTIME_LOG_DEBUG("session %lu %s new c2s tcp assembler %p, s2c tcp assembler %p",
+ session_get_id(sess), session_get0_readable_addr(sess),
+ sess->tcp_halfs[FLOW_TYPE_C2S].assembler,
+ sess->tcp_halfs[FLOW_TYPE_S2C].assembler);
+
+ return 0;
+}
+
+static void tcp_update(struct session_manager_runtime *sess_mgr_rt, struct session *sess, enum flow_type type, const struct layer_private *tcp_layer)
+{
+ struct tcp_segment *seg;
+ struct tcphdr *hdr = (struct tcphdr *)tcp_layer->hdr_ptr;
+ struct tcp_half *half = &sess->tcp_halfs[type];
+ uint8_t flags = tcp_hdr_get_flags(hdr);
+ uint16_t len = tcp_layer->pld_len;
+
+ if ((flags & TH_SYN) && half->isn == 0)
+ {
+ half->isn = tcp_hdr_get_seq(hdr);
+ }
+ half->flags = flags;
+ half->history |= flags;
+ half->seq = tcp_hdr_get_seq(hdr);
+ half->ack = tcp_hdr_get_ack(hdr);
+ half->len = tcp_layer->pld_len;
+
+ if (!sess_mgr_rt->cfg.tcp_reassembly.enable)
+ {
+ if (len)
+ {
+ session_inc_stat(sess, type, STAT_TCP_SEGMENTS_RECEIVED, 1);
+ session_inc_stat(sess, type, STAT_TCP_PAYLOADS_RECEIVED, len);
+ sess_mgr_rt->stat.tcp_segs_input++;
+
+ session_inc_stat(sess, type, STAT_TCP_SEGMENTS_INORDER, 1);
+ session_inc_stat(sess, type, STAT_TCP_PAYLOADS_INORDER, len);
+ sess_mgr_rt->stat.tcp_segs_inorder++;
+
+ half->in_order.data = tcp_layer->pld_ptr;
+ half->in_order.len = len;
+ half->in_order_ref = 0;
+ }
+ return;
+ }
+
+ if (unlikely(flags & TH_SYN))
+ {
+ // len > 0 is SYN with data (TCP Fast Open)
+ tcp_reassembly_set_recv_next(half->assembler, len ? half->seq : half->seq + 1);
+ }
+
+ seg = tcp_reassembly_expire(half->assembler, sess_mgr_rt->now_ms);
+ if (seg)
+ {
+ session_inc_stat(sess, type, STAT_TCP_SEGMENTS_EXPIRED, 1);
+ session_inc_stat(sess, type, STAT_TCP_PAYLOADS_EXPIRED, seg->len);
+ sess_mgr_rt->stat.tcp_segs_timeout++;
+
+ session_inc_stat(sess, type, STAT_TCP_SEGMENTS_RELEASED, 1);
+ session_inc_stat(sess, type, STAT_TCP_PAYLOADS_RELEASED, seg->len);
+ sess_mgr_rt->stat.tcp_segs_freed++;
+
+ tcp_segment_free(seg);
+ }
+
+ if (len)
+ {
+ session_inc_stat(sess, type, STAT_TCP_SEGMENTS_RECEIVED, 1);
+ session_inc_stat(sess, type, STAT_TCP_PAYLOADS_RECEIVED, len);
+ sess_mgr_rt->stat.tcp_segs_input++;
+
+ uint32_t rcv_nxt = tcp_reassembly_get_recv_next(half->assembler);
+ // in order
+ if (half->seq == rcv_nxt)
+ {
+ session_inc_stat(sess, type, STAT_TCP_SEGMENTS_INORDER, 1);
+ session_inc_stat(sess, type, STAT_TCP_PAYLOADS_INORDER, len);
+ sess_mgr_rt->stat.tcp_segs_inorder++;
+
+ half->in_order.data = tcp_layer->pld_ptr;
+ half->in_order.len = len;
+ half->in_order_ref = 0;
+ tcp_reassembly_inc_recv_next(half->assembler, len);
+ }
+ // retransmission
+ else if (uint32_before(uint32_add(half->seq, len), rcv_nxt))
+ {
+ session_inc_stat(sess, type, STAT_TCP_SEGMENTS_RETRANSMIT, 1);
+ session_inc_stat(sess, type, STAT_TCP_PAYLOADS_RETRANSMIT, len);
+ sess_mgr_rt->stat.tcp_segs_retransmited++;
+ }
+ else if ((seg = tcp_segment_new(half->seq, tcp_layer->pld_ptr, len)))
+ {
+ switch (tcp_reassembly_push(half->assembler, seg, sess_mgr_rt->now_ms))
+ {
+ case -2:
+ session_inc_stat(sess, type, STAT_TCP_SEGMENTS_RETRANSMIT, 1);
+ session_inc_stat(sess, type, STAT_TCP_PAYLOADS_RETRANSMIT, len);
+ sess_mgr_rt->stat.tcp_segs_retransmited++;
+ tcp_segment_free(seg);
+ break;
+ case -1:
+ session_inc_stat(sess, type, STAT_TCP_SEGMENTS_NOSPACE, 1);
+ session_inc_stat(sess, type, STAT_TCP_PAYLOADS_NOSPACE, len);
+ sess_mgr_rt->stat.tcp_segs_omitted_too_many++;
+ tcp_segment_free(seg);
+ break;
+ case 0:
+ session_inc_stat(sess, type, STAT_TCP_SEGMENTS_BUFFERED, 1);
+ session_inc_stat(sess, type, STAT_TCP_PAYLOADS_BUFFERED, len);
+ sess_mgr_rt->stat.tcp_segs_buffered++;
+ break;
+ case 1:
+ session_inc_stat(sess, type, STAT_TCP_SEGMENTS_OVERLAP, 1);
+ session_inc_stat(sess, type, STAT_TCP_PAYLOADS_OVERLAP, len);
+ sess_mgr_rt->stat.tcp_segs_overlapped++;
+
+ session_inc_stat(sess, type, STAT_TCP_SEGMENTS_BUFFERED, 1);
+ session_inc_stat(sess, type, STAT_TCP_PAYLOADS_BUFFERED, len);
+ sess_mgr_rt->stat.tcp_segs_buffered++;
+ break;
+ default:
+ assert(0);
+ break;
+ }
+ }
+ else
+ {
+ session_inc_stat(sess, type, STAT_TCP_SEGMENTS_NOSPACE, 1);
+ session_inc_stat(sess, type, STAT_TCP_PAYLOADS_NOSPACE, len);
+ sess_mgr_rt->stat.tcp_segs_omitted_too_many++;
+ }
+ }
+}
+
+/******************************************************************************
+ * session flow
+ ******************************************************************************/
+
+static enum flow_type identify_flow_type_by_port(uint16_t src_port, uint16_t dst_port)
+{
+ // big port is client
+ if (src_port > dst_port)
+ {
+ return FLOW_TYPE_C2S;
+ }
+ else if (src_port < dst_port)
+ {
+ return FLOW_TYPE_S2C;
+ }
+ else
+ {
+ // if port is equal, first packet is C2S
+ return FLOW_TYPE_C2S;
+ }
+}
+
+static enum flow_type identify_flow_type_by_history(const struct session *sess, const struct tuple6 *key)
+{
+ if (tuple6_cmp(session_get_tuple6(sess), key) == 0)
+ {
+ return FLOW_TYPE_C2S;
+ }
+ else
+ {
+ return FLOW_TYPE_S2C;
+ }
+}
+
+/******************************************************************************
+ * bypass packet -- table limit / session evicted / duplicated packet
+ ******************************************************************************/
+
+static int session_manager_runtime_bypass_packet_on_tcp_table_limit(struct session_manager_runtime *sess_mgr_rt, const struct tuple6 *key)
+{
+ if (key->ip_proto == IPPROTO_TCP && sess_mgr_rt->stat.tcp_sess_used >= sess_mgr_rt->cfg.tcp_session_max)
+ {
+ sess_mgr_rt->stat.tcp_pkts_bypass_table_full++;
+ return 1;
+ }
+ return 0;
+}
+
+static int session_manager_runtime_bypass_packet_on_udp_table_limit(struct session_manager_runtime *sess_mgr_rt, const struct tuple6 *key)
+{
+ if (key->ip_proto == IPPROTO_UDP && sess_mgr_rt->stat.udp_sess_used >= sess_mgr_rt->cfg.udp_session_max)
+ {
+ sess_mgr_rt->stat.udp_pkts_bypass_table_full++;
+ return 1;
+ }
+ return 0;
+}
+
+static int session_manager_runtime_bypass_packet_on_session_evicted(struct session_manager_runtime *sess_mgr_rt, const struct tuple6 *key)
+{
+ if (sess_mgr_rt->cfg.evicted_session_bloom_filter.enable && session_filter_lookup(sess_mgr_rt->evicte_sess_filter, key, sess_mgr_rt->now_ms))
+ {
+ sess_mgr_rt->stat.udp_pkts_bypass_session_evicted++;
+ return 1;
+ }
+
+ return 0;
+}
+
+static int session_manager_runtime_bypass_duplicated_packet(struct session_manager_runtime *sess_mgr_rt, struct session *sess, const struct packet *pkt, const struct tuple6 *key)
+{
+ if (sess_mgr_rt->cfg.duplicated_packet_bloom_filter.enable == 0)
+ {
+ return 0;
+ }
+
+ enum flow_type type = identify_flow_type_by_history(sess, key);
+ if (session_get_stat(sess, type, STAT_RAW_PACKETS_RECEIVED) < 3 || session_has_duplicate_traffic(sess))
+ {
+ if (packet_filter_lookup(sess_mgr_rt->dup_pkt_filter, pkt, sess_mgr_rt->now_ms))
+ {
+ session_inc_stat(sess, type, STAT_DUPLICATE_PACKETS_BYPASS, 1);
+ session_inc_stat(sess, type, STAT_DUPLICATE_BYTES_BYPASS, packet_get_raw_len(pkt));
+ switch (session_get_type(sess))
+ {
+ case SESSION_TYPE_TCP:
+ sess_mgr_rt->stat.tcp_pkts_bypass_duplicated++;
+ break;
+ case SESSION_TYPE_UDP:
+ sess_mgr_rt->stat.udp_pkts_bypass_duplicated++;
+ break;
+ default:
+ assert(0);
+ break;
+ }
+ session_set_duplicate_traffic(sess);
+
+ session_set_current_packet(sess, pkt);
+ session_set_flow_type(sess, type);
+ return 1;
+ }
+ else
+ {
+ packet_filter_add(sess_mgr_rt->dup_pkt_filter, pkt, sess_mgr_rt->now_ms);
+ return 0;
+ }
+ }
+
+ return 0;
+}
+
+void session_manager_runtime_record_duplicated_packet(struct session_manager_runtime *sess_mgr_rt, const struct packet *pkt)
+{
+ if (sess_mgr_rt->cfg.duplicated_packet_bloom_filter.enable)
+ {
+ packet_filter_add(sess_mgr_rt->dup_pkt_filter, pkt, sess_mgr_rt->now_ms);
+ }
+}
+
+/******************************************************************************
+ * config -- new / free / print
+ ******************************************************************************/
+
+struct session_manager_config *session_manager_config_new(const char *toml_file)
+{
+ if (toml_file == NULL)
+ {
+ return NULL;
+ }
+
+ struct session_manager_config *sess_mgr_cfg = (struct session_manager_config *)calloc(1, sizeof(struct session_manager_config));
+ if (sess_mgr_cfg == NULL)
+ {
+ return NULL;
+ }
+
+ int ret = 0;
+ ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_session_max", (uint64_t *)&sess_mgr_cfg->tcp_session_max, EVICTE_SESSION_BURST * 2, UINT64_MAX);
+ ret += load_and_validate_toml_integer_config(toml_file, "session_manager.udp_session_max", (uint64_t *)&sess_mgr_cfg->udp_session_max, EVICTE_SESSION_BURST * 2, UINT64_MAX);
+
+ ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evict_old_on_tcp_table_limit", (uint64_t *)&sess_mgr_cfg->evict_old_on_tcp_table_limit, 0, 1);
+ ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evict_old_on_udp_table_limit", (uint64_t *)&sess_mgr_cfg->evict_old_on_udp_table_limit, 0, 1);
+
+ ret += load_and_validate_toml_integer_config(toml_file, "session_manager.expire_period_ms", (uint64_t *)&sess_mgr_cfg->expire_period_ms, 0, 60000);
+ ret += load_and_validate_toml_integer_config(toml_file, "session_manager.expire_batch_max", (uint64_t *)&sess_mgr_cfg->expire_batch_max, 1, 1024);
+
+ ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.init", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.init, 1, 60000);
+ ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.handshake", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.handshake, 1, 60000);
+ ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.data", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.data, 1, 15999999000);
+ ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.half_closed", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.half_closed, 1, 604800000);
+ ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.time_wait", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.time_wait, 1, 60000);
+ ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.discard_default", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.discard_default, 1, 15999999000);
+ ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_timeout_ms.unverified_rst", (uint64_t *)&sess_mgr_cfg->tcp_timeout_ms.unverified_rst, 1, 60000);
+
+ ret += load_and_validate_toml_integer_config(toml_file, "session_manager.udp_timeout_ms.data", (uint64_t *)&sess_mgr_cfg->udp_timeout_ms.data, 1, 15999999000);
+ ret += load_and_validate_toml_integer_config(toml_file, "session_manager.udp_timeout_ms.discard_default", (uint64_t *)&sess_mgr_cfg->udp_timeout_ms.discard_default, 1, 15999999000);
+
+ ret += load_and_validate_toml_integer_config(toml_file, "session_manager.duplicated_packet_bloom_filter.enable", (uint64_t *)&sess_mgr_cfg->duplicated_packet_bloom_filter.enable, 0, 1);
+ ret += load_and_validate_toml_integer_config(toml_file, "session_manager.duplicated_packet_bloom_filter.capacity", (uint64_t *)&sess_mgr_cfg->duplicated_packet_bloom_filter.capacity, 1, 4294967295);
+ ret += load_and_validate_toml_integer_config(toml_file, "session_manager.duplicated_packet_bloom_filter.time_window_ms", (uint64_t *)&sess_mgr_cfg->duplicated_packet_bloom_filter.time_window_ms, 1, 60000);
+ ret += load_and_validate_toml_double_config(toml_file, "session_manager.duplicated_packet_bloom_filter.error_rate", (double *)&sess_mgr_cfg->duplicated_packet_bloom_filter.error_rate, 0.0, 1.0);
+
+ ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evicted_session_bloom_filter.enable", (uint64_t *)&sess_mgr_cfg->evicted_session_bloom_filter.enable, 0, 1);
+ ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evicted_session_bloom_filter.capacity", (uint64_t *)&sess_mgr_cfg->evicted_session_bloom_filter.capacity, 1, 4294967295);
+ ret += load_and_validate_toml_integer_config(toml_file, "session_manager.evicted_session_bloom_filter.time_window_ms", (uint64_t *)&sess_mgr_cfg->evicted_session_bloom_filter.time_window_ms, 1, 60000);
+ ret += load_and_validate_toml_double_config(toml_file, "session_manager.evicted_session_bloom_filter.error_rate", (double *)&sess_mgr_cfg->evicted_session_bloom_filter.error_rate, 0.0, 1.0);
+
+ ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_reassembly.enable", (uint64_t *)&sess_mgr_cfg->tcp_reassembly.enable, 0, 1);
+ ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_reassembly.timeout_ms", (uint64_t *)&sess_mgr_cfg->tcp_reassembly.timeout_ms, 1, 60000);
+ ret += load_and_validate_toml_integer_config(toml_file, "session_manager.tcp_reassembly.buffered_segments_max", (uint64_t *)&sess_mgr_cfg->tcp_reassembly.buffered_segments_max, 1, 512);
+
+ if (ret != 0)
+ {
+ session_manager_config_free(sess_mgr_cfg);
+ return NULL;
+ }
+
+ return sess_mgr_cfg;
+}
+
+void session_manager_config_free(struct session_manager_config *sess_mgr_cfg)
+{
+ if (sess_mgr_cfg)
+ {
+ free(sess_mgr_cfg);
+ sess_mgr_cfg = NULL;
+ }
+}
+
+void session_manager_config_print(struct session_manager_config *sess_mgr_cfg)
+{
+ if (sess_mgr_cfg)
+ {
+ // max session number
+ SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_session_max : %lu", sess_mgr_cfg->tcp_session_max);
+ SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.udp_session_max : %lu", sess_mgr_cfg->udp_session_max);
+
+ // session overload
+ SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.evict_old_on_tcp_table_limit : %d", sess_mgr_cfg->evict_old_on_tcp_table_limit);
+ SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.evict_old_on_udp_table_limit : %d", sess_mgr_cfg->evict_old_on_udp_table_limit);
+
+ // TCP timeout
+ SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_timeout_ms.init : %lu", sess_mgr_cfg->tcp_timeout_ms.init);
+ SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_timeout_ms.handshake : %lu", sess_mgr_cfg->tcp_timeout_ms.handshake);
+ SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_timeout_ms.data : %lu", sess_mgr_cfg->tcp_timeout_ms.data);
+ SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_timeout_ms.half_closed : %lu", sess_mgr_cfg->tcp_timeout_ms.half_closed);
+ SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_timeout_ms.time_wait : %lu", sess_mgr_cfg->tcp_timeout_ms.time_wait);
+ SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_timeout_ms.discard_default : %lu", sess_mgr_cfg->tcp_timeout_ms.discard_default);
+ SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_timeout_ms.unverified_rst : %lu", sess_mgr_cfg->tcp_timeout_ms.unverified_rst);
+
+ // UDP timeout
+ SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.udp_timeout_ms.data : %lu", sess_mgr_cfg->udp_timeout_ms.data);
+ SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.udp_timeout_ms.discard_default : %lu", sess_mgr_cfg->udp_timeout_ms.discard_default);
+
+ // limit
+ SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.expire_period_ms : %lu", sess_mgr_cfg->expire_period_ms);
+ SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.expire_batch_max : %lu", sess_mgr_cfg->expire_batch_max);
+
+ // duplicated packet filter
+ SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.duplicated_packet_bloom_filter.enable : %d", sess_mgr_cfg->duplicated_packet_bloom_filter.enable);
+ SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.duplicated_packet_bloom_filter.capacity : %lu", sess_mgr_cfg->duplicated_packet_bloom_filter.capacity);
+ SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.duplicated_packet_bloom_filter.time_window_ms : %lu", sess_mgr_cfg->duplicated_packet_bloom_filter.time_window_ms);
+ SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.duplicated_packet_bloom_filter.error_rate : %f", sess_mgr_cfg->duplicated_packet_bloom_filter.error_rate);
+
+ // eviction session filter
+ SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.evicted_session_bloom_filter.enable : %d", sess_mgr_cfg->evicted_session_bloom_filter.enable);
+ SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.evicted_session_bloom_filter.capacity : %lu", sess_mgr_cfg->evicted_session_bloom_filter.capacity);
+ SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.evicted_session_bloom_filter.time_window_ms : %lu", sess_mgr_cfg->evicted_session_bloom_filter.time_window_ms);
+ SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.evicted_session_bloom_filter.error_rate : %f", sess_mgr_cfg->evicted_session_bloom_filter.error_rate);
+
+ // TCP reassembly
+ SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_reassembly.enable : %d", sess_mgr_cfg->tcp_reassembly.enable);
+ SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_reassembly.timeout_ms : %lu", sess_mgr_cfg->tcp_reassembly.timeout_ms);
+ SESSION_MANAGER_RUNTIME_LOG_INFO("session_manager.tcp_reassembly.buffered_segments_max : %lu", sess_mgr_cfg->tcp_reassembly.buffered_segments_max);
+ }
+}
+
+/******************************************************************************
+ * runtime -- new / free
+ ******************************************************************************/
+
+struct session_manager_runtime *session_manager_runtime_new(const struct session_manager_config *sess_mgr_cfg, uint64_t now_ms)
+{
+ struct session_manager_runtime *sess_mgr_rt = (struct session_manager_runtime *)calloc(1, sizeof(struct session_manager_runtime));
+ if (sess_mgr_rt == NULL)
+ {
+ return NULL;
+ }
+ memcpy(&sess_mgr_rt->cfg, sess_mgr_cfg, sizeof(struct session_manager_config));
+
+ sess_mgr_rt->sess_pool = session_pool_new(sess_mgr_rt->cfg.tcp_session_max + sess_mgr_rt->cfg.udp_session_max);
+ sess_mgr_rt->tcp_sess_table = session_table_new();
+ sess_mgr_rt->udp_sess_table = session_table_new();
+ sess_mgr_rt->sess_timer = session_timer_new(now_ms);
+ if (sess_mgr_rt->sess_pool == NULL || sess_mgr_rt->tcp_sess_table == NULL || sess_mgr_rt->udp_sess_table == NULL || sess_mgr_rt->sess_timer == NULL)
+ {
+ goto error;
+ }
+ if (sess_mgr_rt->cfg.evicted_session_bloom_filter.enable)
+ {
+ sess_mgr_rt->evicte_sess_filter = session_filter_new(sess_mgr_rt->cfg.evicted_session_bloom_filter.capacity,
+ sess_mgr_rt->cfg.evicted_session_bloom_filter.time_window_ms,
+ sess_mgr_rt->cfg.evicted_session_bloom_filter.error_rate, now_ms);
+ if (sess_mgr_rt->evicte_sess_filter == NULL)
+ {
+ goto error;
+ }
+ }
+ if (sess_mgr_rt->cfg.duplicated_packet_bloom_filter.enable)
+ {
+ sess_mgr_rt->dup_pkt_filter = packet_filter_new(sess_mgr_rt->cfg.duplicated_packet_bloom_filter.capacity,
+ sess_mgr_rt->cfg.duplicated_packet_bloom_filter.time_window_ms,
+ sess_mgr_rt->cfg.duplicated_packet_bloom_filter.error_rate, now_ms);
+ if (sess_mgr_rt->dup_pkt_filter == NULL)
+ {
+ goto error;
+ }
+ }
+ sess_mgr_rt->sf = snowflake_new(sess_mgr_rt->cfg.session_id_seed);
+ if (sess_mgr_rt->sf == NULL)
+ {
+ goto error;
+ }
+
+ TAILQ_INIT(&sess_mgr_rt->evicte_list);
+ session_transition_init();
+ sess_mgr_rt->now_ms = now_ms;
+ sess_mgr_rt->last_clean_expired_sess_ts = now_ms;
+
+ return sess_mgr_rt;
+
+error:
+ session_manager_runtime_free(sess_mgr_rt);
+ return NULL;
+}
+
+void session_manager_runtime_free(struct session_manager_runtime *sess_mgr_rt)
+{
+ struct session *sess;
+ if (sess_mgr_rt)
+ {
+ // free all evicted session
+ while ((sess = TAILQ_FIRST(&sess_mgr_rt->evicte_list)))
+ {
+ TAILQ_REMOVE(&sess_mgr_rt->evicte_list, sess, evicte_tqe);
+ session_manager_runtime_free_session(sess_mgr_rt, sess);
+ }
+ // free all udp session
+ while (sess_mgr_rt->udp_sess_table && (sess = session_table_find_lru(sess_mgr_rt->udp_sess_table)))
+ {
+ session_manager_runtime_free_session(sess_mgr_rt, sess);
+ }
+ // free all tcp session
+ while (sess_mgr_rt->tcp_sess_table && (sess = session_table_find_lru(sess_mgr_rt->tcp_sess_table)))
+ {
+ session_manager_runtime_free_session(sess_mgr_rt, sess);
+ }
+ if (sess_mgr_rt->cfg.evicted_session_bloom_filter.enable)
+ {
+ session_filter_free(sess_mgr_rt->evicte_sess_filter);
+ }
+ if (sess_mgr_rt->cfg.duplicated_packet_bloom_filter.enable)
+ {
+ packet_filter_free(sess_mgr_rt->dup_pkt_filter);
+ }
+ snowflake_free(sess_mgr_rt->sf);
+ session_timer_free(sess_mgr_rt->sess_timer);
+ session_table_free(sess_mgr_rt->udp_sess_table);
+ session_table_free(sess_mgr_rt->tcp_sess_table);
+ session_pool_free(sess_mgr_rt->sess_pool);
+ free(sess_mgr_rt);
+ sess_mgr_rt = NULL;
+ }
+}
+
+/******************************************************************************
+ * session -- new / free / lookup / updata / expire / evicte / clean
+ ******************************************************************************/
+
+static void session_update(struct session_manager_runtime *sess_mgr_rt, struct session *sess, enum session_state next_state, const struct packet *pkt, const struct tuple6 *key, enum flow_type type)
+{
+ if (session_get_current_state(sess) == SESSION_STATE_INIT)
+ {
+ uint64_t sess_id = snowflake_generate(sess_mgr_rt->sf, sess_mgr_rt->now_ms / 1000);
+ session_set_id(sess, sess_id);
+ enum packet_direction pkt_dir = packet_get_direction(pkt);
+ if (type == FLOW_TYPE_C2S)
+ {
+ session_set_tuple6(sess, key);
+ if (pkt_dir == PACKET_DIRECTION_OUTGOING) // Internal -> External
+ {
+ session_set_direction(sess, SESSION_DIRECTION_OUTBOUND);
+ }
+ else
+ {
+ session_set_direction(sess, SESSION_DIRECTION_INBOUND);
+ }
+ tuple6_to_str(key, sess->tuple_str, sizeof(sess->tuple_str));
+ }
+ else
+ {
+ struct tuple6 out;
+ tuple6_reverse(key, &out);
+ session_set_tuple6(sess, &out);
+ if (pkt_dir == PACKET_DIRECTION_OUTGOING) // Internal -> External
+ {
+ session_set_direction(sess, SESSION_DIRECTION_INBOUND);
+ }
+ else
+ {
+ session_set_direction(sess, SESSION_DIRECTION_OUTBOUND);
+ }
+ tuple6_to_str(&out, sess->tuple_str, sizeof(sess->tuple_str));
+ }
+
+ session_set_timestamp(sess, SESSION_TIMESTAMP_START, sess_mgr_rt->now_ms);
+ switch (key->ip_proto)
+ {
+ case IPPROTO_TCP:
+ session_set_type(sess, SESSION_TYPE_TCP);
+ break;
+ case IPPROTO_UDP:
+ session_set_type(sess, SESSION_TYPE_UDP);
+ break;
+ default:
+ assert(0);
+ break;
+ }
+ }
+
+ session_inc_stat(sess, type, STAT_RAW_PACKETS_RECEIVED, 1);
+ session_inc_stat(sess, type, STAT_RAW_BYTES_RECEIVED, packet_get_raw_len(pkt));
+
+ if (!session_get_first_packet(sess, type))
+ {
+ session_set_first_packet(sess, type, packet_dup(pkt));
+ session_set_route_ctx(sess, type, packet_get_route_ctx(pkt));
+ session_set_sids(sess, type, packet_get_sids(pkt));
+ }
+
+ session_set_current_packet(sess, pkt);
+ session_set_flow_type(sess, type);
+ session_set_timestamp(sess, SESSION_TIMESTAMP_LAST, sess_mgr_rt->now_ms);
+ session_set_current_state(sess, next_state);
+}
+
+static void session_manager_runtime_evicte_session(struct session_manager_runtime *sess_mgr_rt, struct session *sess, int reason)
+{
+ if (sess == NULL)
+ {
+ return;
+ }
+
+ // when session add to evicted queue, session lifetime is over
+ enum session_state curr_state = session_get_current_state(sess);
+ enum session_state next_state = session_transition_run(curr_state, reason);
+ session_transition_log(sess, curr_state, next_state, reason);
+ session_set_current_state(sess, next_state);
+ if (!session_get_closing_reason(sess))
+ {
+ if (reason == PORT_REUSE_EVICT)
+ {
+ session_set_closing_reason(sess, CLOSING_BY_PORT_REUSE_EVICTED);
+ }
+ if (reason == LRU_EVICT)
+ {
+ session_set_closing_reason(sess, CLOSING_BY_LRU_EVICTED);
+ }
+ }
+ session_timer_del(sess_mgr_rt->sess_timer, sess);
+ TAILQ_INSERT_TAIL(&sess_mgr_rt->evicte_list, sess, evicte_tqe);
+
+ switch (session_get_type(sess))
+ {
+ case SESSION_TYPE_TCP:
+ SESSION_MANAGER_RUNTIME_LOG_DEBUG("evicte tcp old session: %lu", session_get_id(sess));
+ session_table_del(sess_mgr_rt->tcp_sess_table, sess);
+ SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, tcp);
+ sess_mgr_rt->stat.tcp_sess_evicted++;
+ break;
+ case SESSION_TYPE_UDP:
+ SESSION_MANAGER_RUNTIME_LOG_DEBUG("evicte udp old session: %lu", session_get_id(sess));
+ session_table_del(sess_mgr_rt->udp_sess_table, sess);
+ if (sess_mgr_rt->cfg.evicted_session_bloom_filter.enable)
+ {
+ session_filter_add(sess_mgr_rt->evicte_sess_filter, session_get_tuple6(sess), sess_mgr_rt->now_ms);
+ }
+ SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, udp);
+ sess_mgr_rt->stat.udp_sess_evicted++;
+ break;
+ default:
+ assert(0);
+ break;
+ }
+}
+
+static struct session *session_manager_runtime_lookup_tcp_session(struct session_manager_runtime *sess_mgr_rt, const struct packet *pkt, const struct tuple6 *key)
+{
+ struct session *sess = session_table_find_tuple6(sess_mgr_rt->tcp_sess_table, key, 0);
+ if (sess == NULL)
+ {
+ return NULL;
+ }
+
+ const struct layer_private *tcp_layer = packet_get_innermost_layer(pkt, LAYER_PROTO_TCP);
+ const struct tcphdr *hdr = (const struct tcphdr *)tcp_layer->hdr_ptr;
+ uint8_t flags = tcp_hdr_get_flags(hdr);
+ if ((flags & TH_SYN) == 0)
+ {
+ return sess;
+ }
+
+ enum flow_type type = identify_flow_type_by_history(sess, key);
+ struct tcp_half *half = &sess->tcp_halfs[type];
+ if ((half->isn && half->isn != tcp_hdr_get_seq(hdr)) || // recv SYN with different ISN
+ ((half->history & TH_FIN) || (half->history & TH_RST))) // recv SYN after FIN or RST
+ {
+ // TCP port reuse, evict old session
+ session_manager_runtime_evicte_session(sess_mgr_rt, sess, PORT_REUSE_EVICT);
+ return NULL;
+ }
+ else
+ {
+ // TCP SYN retransmission
+ return sess;
+ }
+}
+
+static struct session *session_manager_runtime_new_tcp_session(struct session_manager_runtime *sess_mgr_rt, const struct packet *pkt, const struct tuple6 *key)
+{
+ const struct layer_private *tcp_layer = packet_get_innermost_layer(pkt, LAYER_PROTO_TCP);
+ const struct tcphdr *hdr = (const struct tcphdr *)tcp_layer->hdr_ptr;
+ uint8_t flags = tcp_hdr_get_flags(hdr);
+ if (!(flags & TH_SYN))
+ {
+ sess_mgr_rt->stat.tcp_pkts_bypass_session_not_found++;
+ return NULL;
+ }
+
+ // tcp table full evict old session
+ if (sess_mgr_rt->cfg.evict_old_on_tcp_table_limit && sess_mgr_rt->stat.tcp_sess_used >= sess_mgr_rt->cfg.tcp_session_max - EVICTE_SESSION_BURST)
+ {
+ struct session *evic_sess = session_table_find_lru(sess_mgr_rt->tcp_sess_table);
+ session_manager_runtime_evicte_session(sess_mgr_rt, evic_sess, LRU_EVICT);
+ }
+
+ enum flow_type type = (flags & TH_ACK) ? FLOW_TYPE_S2C : FLOW_TYPE_C2S;
+ struct session *sess = session_pool_pop(sess_mgr_rt->sess_pool);
+ if (sess == NULL)
+ {
+ assert(0);
+ return NULL;
+ }
+ session_init(sess);
+ sess->sess_mgr_rt = sess_mgr_rt;
+ sess->sess_mgr_stat = &sess_mgr_rt->stat;
+
+ enum session_state next_state = session_transition_run(SESSION_STATE_INIT, TCP_SYN);
+ session_update(sess_mgr_rt, sess, next_state, pkt, key, type);
+ session_transition_log(sess, SESSION_STATE_INIT, next_state, TCP_SYN);
+
+ if (tcp_init(sess_mgr_rt, sess) == -1)
+ {
+ assert(0);
+ session_pool_push(sess_mgr_rt->sess_pool, sess);
+ return NULL;
+ }
+ tcp_update(sess_mgr_rt, sess, type, tcp_layer);
+
+ uint64_t timeout = (flags & TH_ACK) ? sess_mgr_rt->cfg.tcp_timeout_ms.handshake : sess_mgr_rt->cfg.tcp_timeout_ms.init;
+ session_timer_update(sess_mgr_rt->sess_timer, sess, sess_mgr_rt->now_ms + timeout);
+ session_table_add(sess_mgr_rt->tcp_sess_table, sess);
+
+ if (sess_mgr_rt->cfg.duplicated_packet_bloom_filter.enable)
+ {
+ packet_filter_add(sess_mgr_rt->dup_pkt_filter, pkt, sess_mgr_rt->now_ms);
+ }
+
+ SESS_MGR_STAT_INC(&sess_mgr_rt->stat, next_state, tcp);
+ sess_mgr_rt->stat.tcp_sess_used++;
+ sess_mgr_rt->stat.history_tcp_sessions++;
+
+ return sess;
+}
+
+static struct session *session_manager_runtime_new_udp_session(struct session_manager_runtime *sess_mgr_rt, const struct packet *pkt, const struct tuple6 *key)
+{
+ // udp table full evict old session
+ if (sess_mgr_rt->cfg.evict_old_on_udp_table_limit && sess_mgr_rt->stat.udp_sess_used >= sess_mgr_rt->cfg.udp_session_max - EVICTE_SESSION_BURST)
+ {
+ struct session *evic_sess = session_table_find_lru(sess_mgr_rt->udp_sess_table);
+ session_manager_runtime_evicte_session(sess_mgr_rt, evic_sess, LRU_EVICT);
+ }
+
+ struct session *sess = session_pool_pop(sess_mgr_rt->sess_pool);
+ if (sess == NULL)
+ {
+ assert(sess);
+ return NULL;
+ }
+ session_init(sess);
+ sess->sess_mgr_rt = sess_mgr_rt;
+ sess->sess_mgr_stat = &sess_mgr_rt->stat;
+
+ enum flow_type type = identify_flow_type_by_port(ntohs(key->src_port), ntohs(key->dst_port));
+ enum session_state next_state = session_transition_run(SESSION_STATE_INIT, UDP_DATA);
+ session_update(sess_mgr_rt, sess, next_state, pkt, key, type);
+ session_transition_log(sess, SESSION_STATE_INIT, next_state, UDP_DATA);
+
+ session_timer_update(sess_mgr_rt->sess_timer, sess, sess_mgr_rt->now_ms + sess_mgr_rt->cfg.udp_timeout_ms.data);
+ session_table_add(sess_mgr_rt->udp_sess_table, sess);
+
+ SESS_MGR_STAT_INC(&sess_mgr_rt->stat, next_state, udp);
+ sess_mgr_rt->stat.udp_sess_used++;
+ sess_mgr_rt->stat.history_udp_sessions++;
+
+ return sess;
+}
+
+static int session_manager_runtime_update_tcp_session(struct session_manager_runtime *sess_mgr_rt, struct session *sess, const struct packet *pkt, const struct tuple6 *key)
+{
+ const struct layer_private *tcp_layer = packet_get_innermost_layer(pkt, LAYER_PROTO_TCP);
+ const struct tcphdr *hdr = (const struct tcphdr *)tcp_layer->hdr_ptr;
+ enum flow_type type = identify_flow_type_by_history(sess, key);
+ uint8_t flags = tcp_hdr_get_flags(hdr);
+ int inputs = 0;
+ inputs |= (flags & TH_SYN) ? TCP_SYN : NONE;
+ inputs |= (flags & TH_FIN) ? TCP_FIN : NONE;
+ inputs |= (flags & TH_RST) ? TCP_RST : NONE;
+ inputs |= tcp_layer->pld_len ? TCP_DATA : NONE;
+
+ // update state
+ enum session_state curr_state = session_get_current_state(sess);
+ enum session_state next_state = session_transition_run(curr_state, inputs);
+
+ // update session
+ session_update(sess_mgr_rt, sess, next_state, pkt, key, type);
+ session_transition_log(sess, curr_state, next_state, inputs);
+
+ // update tcp
+ tcp_update(sess_mgr_rt, sess, type, tcp_layer);
+
+ // set closing reason
+ if (next_state == SESSION_STATE_CLOSING && !session_get_closing_reason(sess))
+ {
+ if (flags & TH_FIN)
+ {
+ session_set_closing_reason(sess, (type == FLOW_TYPE_C2S ? CLOSING_BY_CLIENT_FIN : CLOSING_BY_SERVER_FIN));
+ }
+ if (flags & TH_RST)
+ {
+ session_set_closing_reason(sess, (type == FLOW_TYPE_C2S ? CLOSING_BY_CLIENT_RST : CLOSING_BY_SERVER_RST));
+ }
+ }
+
+ // update timeout
+ struct tcp_half *curr = &sess->tcp_halfs[type];
+ struct tcp_half *peer = &sess->tcp_halfs[(type == FLOW_TYPE_C2S ? FLOW_TYPE_S2C : FLOW_TYPE_C2S)];
+ uint64_t timeout = 0;
+ switch (next_state)
+ {
+ case SESSION_STATE_OPENING:
+ if (flags & TH_SYN)
+ {
+ timeout = (flags & TH_ACK) ? sess_mgr_rt->cfg.tcp_timeout_ms.handshake : sess_mgr_rt->cfg.tcp_timeout_ms.init;
+ }
+ else
+ {
+ timeout = sess_mgr_rt->cfg.tcp_timeout_ms.data;
+ }
+ break;
+ case SESSION_STATE_ACTIVE:
+ timeout = sess_mgr_rt->cfg.tcp_timeout_ms.data;
+ break;
+ case SESSION_STATE_CLOSING:
+ if (flags & TH_FIN)
+ {
+ timeout = (peer->history & TH_FIN) ? sess_mgr_rt->cfg.tcp_timeout_ms.time_wait : sess_mgr_rt->cfg.tcp_timeout_ms.half_closed;
+ }
+ else if (flags & TH_RST)
+ {
+ // if fin is received, the expected sequence number should be increased by 1
+ uint32_t expected = (peer->history & TH_FIN) ? peer->ack + 1 : peer->ack;
+ timeout = (expected == curr->seq) ? sess_mgr_rt->cfg.tcp_timeout_ms.time_wait : sess_mgr_rt->cfg.tcp_timeout_ms.unverified_rst;
+ }
+ else
+ {
+ timeout = sess_mgr_rt->cfg.tcp_timeout_ms.data;
+ }
+ break;
+ case SESSION_STATE_DISCARD:
+ timeout = sess_mgr_rt->cfg.tcp_timeout_ms.discard_default;
+ break;
+ default:
+ assert(0);
+ break;
+ }
+ session_timer_update(sess_mgr_rt->sess_timer, sess, sess_mgr_rt->now_ms + timeout);
+
+ SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, tcp);
+
+ return 0;
+}
+
+static int session_manager_runtime_update_udp_session(struct session_manager_runtime *sess_mgr_rt, struct session *sess, const struct packet *pkt, const struct tuple6 *key)
+{
+ enum flow_type type = identify_flow_type_by_history(sess, key);
+ enum session_state curr_state = session_get_current_state(sess);
+ enum session_state next_state = session_transition_run(curr_state, UDP_DATA);
+ session_update(sess_mgr_rt, sess, next_state, pkt, key, type);
+ session_transition_log(sess, curr_state, next_state, UDP_DATA);
+
+ if (session_get_current_state(sess) == SESSION_STATE_DISCARD)
+ {
+ session_timer_update(sess_mgr_rt->sess_timer, sess, sess_mgr_rt->now_ms + sess_mgr_rt->cfg.udp_timeout_ms.discard_default);
+ }
+ else
+ {
+ session_timer_update(sess_mgr_rt->sess_timer, sess, sess_mgr_rt->now_ms + sess_mgr_rt->cfg.udp_timeout_ms.data);
+ }
+
+ SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, udp);
+
+ return 0;
+}
+
+struct session *session_manager_runtime_new_session(struct session_manager_runtime *sess_mgr_rt, const struct packet *pkt, uint64_t now_ms)
+{
+ sess_mgr_rt->now_ms = now_ms;
+
+ struct tuple6 key;
+ if (packet_get_innermost_tuple6(pkt, &key))
+ {
+ return NULL;
+ }
+ switch (key.ip_proto)
+ {
+ case IPPROTO_TCP:
+ if (session_manager_runtime_bypass_packet_on_tcp_table_limit(sess_mgr_rt, &key))
+ {
+ return NULL;
+ }
+ return session_manager_runtime_new_tcp_session(sess_mgr_rt, pkt, &key);
+ case IPPROTO_UDP:
+ if (session_manager_runtime_bypass_packet_on_session_evicted(sess_mgr_rt, &key))
+ {
+ return NULL;
+ }
+ if (session_manager_runtime_bypass_packet_on_udp_table_limit(sess_mgr_rt, &key))
+ {
+ return NULL;
+ }
+ return session_manager_runtime_new_udp_session(sess_mgr_rt, pkt, &key);
+ default:
+ return NULL;
+ }
+}
+
+void session_manager_runtime_free_session(struct session_manager_runtime *sess_mgr_rt, struct session *sess)
+{
+ if (sess)
+ {
+ SESSION_MANAGER_RUNTIME_LOG_DEBUG("session %lu closed (%s)", session_get_id(sess), closing_reason_to_str(session_get_closing_reason(sess)));
+
+ session_timer_del(sess_mgr_rt->sess_timer, sess);
+ switch (session_get_type(sess))
+ {
+ case SESSION_TYPE_TCP:
+ tcp_clean(sess_mgr_rt, sess);
+ if (session_table_find_sessid(sess_mgr_rt->tcp_sess_table, session_get_id(sess), 0) == sess)
+ {
+ session_table_del(sess_mgr_rt->tcp_sess_table, sess);
+ }
+ SESS_MGR_STAT_DEC(&sess_mgr_rt->stat, session_get_current_state(sess), tcp);
+ sess_mgr_rt->stat.tcp_sess_used--;
+ break;
+ case SESSION_TYPE_UDP:
+ if (session_table_find_sessid(sess_mgr_rt->udp_sess_table, session_get_id(sess), 0) == sess)
+ {
+ session_table_del(sess_mgr_rt->udp_sess_table, sess);
+ }
+ SESS_MGR_STAT_DEC(&sess_mgr_rt->stat, session_get_current_state(sess), udp);
+ sess_mgr_rt->stat.udp_sess_used--;
+ break;
+ default:
+ assert(0);
+ break;
+ }
+
+ packet_free((struct packet *)session_get_first_packet(sess, FLOW_TYPE_C2S));
+ packet_free((struct packet *)session_get_first_packet(sess, FLOW_TYPE_S2C));
+ session_set_first_packet(sess, FLOW_TYPE_C2S, NULL);
+ session_set_first_packet(sess, FLOW_TYPE_S2C, NULL);
+ session_clear_route_ctx(sess, FLOW_TYPE_C2S);
+ session_clear_route_ctx(sess, FLOW_TYPE_S2C);
+ session_clear_sids(sess, FLOW_TYPE_C2S);
+ session_clear_sids(sess, FLOW_TYPE_S2C);
+ session_set_current_state(sess, SESSION_STATE_INIT);
+ session_set_current_packet(sess, NULL);
+ session_set_flow_type(sess, FLOW_TYPE_NONE);
+ session_pool_push(sess_mgr_rt->sess_pool, sess);
+ sess = NULL;
+ }
+}
+
+struct session *session_manager_runtime_lookup_session_by_packet(struct session_manager_runtime *sess_mgr_rt, const struct packet *pkt)
+{
+ struct tuple6 key;
+ if (packet_get_innermost_tuple6(pkt, &key))
+ {
+ return NULL;
+ }
+ switch (key.ip_proto)
+ {
+ case IPPROTO_UDP:
+ return session_table_find_tuple6(sess_mgr_rt->udp_sess_table, &key, 0);
+ case IPPROTO_TCP:
+ return session_manager_runtime_lookup_tcp_session(sess_mgr_rt, pkt, &key);
+ default:
+ return NULL;
+ }
+}
+
+struct session *session_manager_runtime_lookup_session_by_id(struct session_manager_runtime *sess_mgr_rt, uint64_t sess_id)
+{
+ struct session *sess = NULL;
+ sess = session_table_find_sessid(sess_mgr_rt->tcp_sess_table, sess_id, 1);
+ if (sess)
+ {
+ return sess;
+ }
+
+ sess = session_table_find_sessid(sess_mgr_rt->udp_sess_table, sess_id, 1);
+ if (sess)
+ {
+ return sess;
+ }
+
+ return NULL;
+}
+
+int session_manager_runtime_update_session(struct session_manager_runtime *sess_mgr_rt, struct session *sess, const struct packet *pkt, uint64_t now_ms)
+{
+ sess_mgr_rt->now_ms = now_ms;
+
+ struct tuple6 key;
+ if (packet_get_innermost_tuple6(pkt, &key))
+ {
+ return -1;
+ }
+ if (session_manager_runtime_bypass_duplicated_packet(sess_mgr_rt, sess, pkt, &key))
+ {
+ return -1;
+ }
+ switch (session_get_type(sess))
+ {
+ case SESSION_TYPE_TCP:
+ return session_manager_runtime_update_tcp_session(sess_mgr_rt, sess, pkt, &key);
+ case SESSION_TYPE_UDP:
+ return session_manager_runtime_update_udp_session(sess_mgr_rt, sess, pkt, &key);
+ default:
+ return -1;
+ }
+}
+
+struct session *session_manager_runtime_get_expired_session(struct session_manager_runtime *sess_mgr_rt, uint64_t now_ms)
+{
+ sess_mgr_rt->now_ms = now_ms;
+
+ struct session *sess = session_timer_expire(sess_mgr_rt->sess_timer, now_ms);
+ if (sess)
+ {
+ enum session_state curr_state = session_get_current_state(sess);
+ enum session_state next_state = session_transition_run(curr_state, TIMEOUT);
+ session_transition_log(sess, curr_state, next_state, TIMEOUT);
+ session_set_current_state(sess, next_state);
+
+ switch (session_get_type(sess))
+ {
+ case SESSION_TYPE_TCP:
+ SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, tcp);
+ break;
+ case SESSION_TYPE_UDP:
+ SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, udp);
+ break;
+ default:
+ assert(0);
+ break;
+ }
+
+ // next state is closed, need to free session
+ if (next_state == SESSION_STATE_CLOSED)
+ {
+ if (!session_get_closing_reason(sess))
+ {
+ session_set_closing_reason(sess, CLOSING_BY_TIMEOUT);
+ }
+ return sess;
+ }
+ // next state is closing, only update timeout
+ else
+ {
+ switch (session_get_type(sess))
+ {
+ case SESSION_TYPE_TCP:
+ session_timer_update(sess_mgr_rt->sess_timer, sess, now_ms + sess_mgr_rt->cfg.tcp_timeout_ms.data);
+ break;
+ case SESSION_TYPE_UDP:
+ session_timer_update(sess_mgr_rt->sess_timer, sess, now_ms + sess_mgr_rt->cfg.udp_timeout_ms.data);
+ break;
+ default:
+ assert(0);
+ break;
+ }
+ return NULL;
+ }
+ }
+
+ return NULL;
+}
+
+struct session *session_manager_runtime_get_evicted_session(struct session_manager_runtime *sess_mgr_rt)
+{
+ struct session *sess = TAILQ_FIRST(&sess_mgr_rt->evicte_list);
+ if (sess)
+ {
+ TAILQ_REMOVE(&sess_mgr_rt->evicte_list, sess, evicte_tqe);
+ }
+ return sess;
+}
+
+uint64_t session_manager_runtime_clean_session(struct session_manager_runtime *sess_mgr_rt, uint64_t now_ms, struct session *cleaned_sess[], uint64_t array_size)
+{
+ sess_mgr_rt->now_ms = now_ms;
+ struct session *sess = NULL;
+ uint64_t cleaned_sess_num = 0;
+ uint64_t expired_sess_num = 0;
+
+ uint8_t expired_sess_canbe_clean = 0;
+ if (now_ms - sess_mgr_rt->last_clean_expired_sess_ts >= sess_mgr_rt->cfg.expire_period_ms)
+ {
+ expired_sess_canbe_clean = 1;
+ }
+
+ for (uint64_t i = 0; i < array_size; i++)
+ {
+ // frist clean evicted session
+ sess = session_manager_runtime_get_evicted_session(sess_mgr_rt);
+ if (sess)
+ {
+ cleaned_sess[cleaned_sess_num++] = sess;
+ }
+ // then clean expired session
+ else
+ {
+ if (expired_sess_canbe_clean && expired_sess_num < sess_mgr_rt->cfg.expire_batch_max)
+ {
+ sess_mgr_rt->last_clean_expired_sess_ts = now_ms;
+ sess = session_manager_runtime_get_expired_session(sess_mgr_rt, now_ms);
+ if (sess)
+ {
+ cleaned_sess[cleaned_sess_num++] = sess;
+ expired_sess_num++;
+ }
+ else
+ {
+ break;
+ }
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+
+ return cleaned_sess_num;
+}
+
+/******************************************************************************
+ * stat -- get / print
+ ******************************************************************************/
+
+struct session_manager_stat *session_manager_runtime_get_stat(struct session_manager_runtime *sess_mgr_rt)
+{
+ return &sess_mgr_rt->stat;
+}
+
+void session_manager_runtime_print_stat(struct session_manager_runtime *sess_mgr_rt)
+{
+ struct session_manager_stat *stat = &sess_mgr_rt->stat;
+
+ // TCP session
+ SESSION_MANAGER_RUNTIME_LOG_INFO("TCP session: history=%lu, used=%lu, opening=%lu, active=%lu, closing=%lu, discard=%lu, closed=%lu",
+ stat->history_tcp_sessions, stat->tcp_sess_used, stat->tcp_sess_opening, stat->tcp_sess_active,
+ stat->tcp_sess_closing, stat->tcp_sess_discard, stat->tcp_sess_closed);
+ // UDP session
+ SESSION_MANAGER_RUNTIME_LOG_INFO("UDP session: history=%lu, used=%lu, opening=%lu, active=%lu, closing=%lu, discard=%lu, closed=%lu",
+ stat->history_udp_sessions, stat->udp_sess_used, stat->udp_sess_opening, stat->udp_sess_active,
+ stat->udp_sess_closing, stat->udp_sess_discard, stat->udp_sess_closed);
+ // evicted session
+ SESSION_MANAGER_RUNTIME_LOG_INFO("evicted session: TCP=%lu, UDP=%lu", stat->tcp_sess_evicted, stat->udp_sess_evicted);
+ // Bypassed packet
+ SESSION_MANAGER_RUNTIME_LOG_INFO("bypassed TCP packet: table_full=%lu, session_not_found=%lu, duplicated=%lu",
+ stat->tcp_pkts_bypass_table_full, stat->tcp_pkts_bypass_session_not_found, stat->tcp_pkts_bypass_duplicated);
+ SESSION_MANAGER_RUNTIME_LOG_INFO("bypassed UDP packet: table_full=%lu, session_evicted=%lu, duplicated=%lu",
+ stat->udp_pkts_bypass_table_full, stat->udp_pkts_bypass_session_evicted, stat->udp_pkts_bypass_duplicated);
+ // TCP segment
+ SESSION_MANAGER_RUNTIME_LOG_INFO("TCP segment: input=%lu, consumed=%lu, timeout=%lu, retransmited=%lu, overlapped=%lu, omitted_too_many=%lu, inorder=%lu, reordered=%lu, buffered=%lu, freed=%lu",
+ stat->tcp_segs_input, stat->tcp_segs_consumed, stat->tcp_segs_timeout, stat->tcp_segs_retransmited,
+ stat->tcp_segs_overlapped, stat->tcp_segs_omitted_too_many, stat->tcp_segs_inorder, stat->tcp_segs_reordered,
+ stat->tcp_segs_buffered, stat->tcp_segs_freed);
+}
+
+/******************************************************************************
+ * scan
+ ******************************************************************************/
+
+static inline uint8_t ipv4_in_range(const struct in_addr *addr, const struct in_addr *start, const struct in_addr *end)
+{
+ return (memcmp(addr, start, sizeof(struct in_addr)) >= 0 && memcmp(addr, end, sizeof(struct in_addr)) <= 0);
+}
+
+static inline uint8_t ipv6_in_range(const struct in6_addr *addr, const struct in6_addr *start, const struct in6_addr *end)
+{
+ return (memcmp(addr, start, sizeof(struct in6_addr)) >= 0 && memcmp(addr, end, sizeof(struct in6_addr)) <= 0);
+}
+
+uint64_t session_manager_runtime_scan(const struct session_manager_runtime *sess_mgr_rt, const struct session_scan_opts *opts, uint64_t mached_sess_ids[], uint64_t array_size)
+{
+ uint64_t capacity = 0;
+ uint64_t max_loop = 0;
+ uint64_t mached_sess_num = 0;
+ const struct session *sess = NULL;
+ const struct tuple6 *tuple = NULL;
+
+ if (sess_mgr_rt == NULL || opts == NULL || mached_sess_ids == NULL || array_size == 0)
+ {
+ return mached_sess_num;
+ }
+ if (opts->count == 0)
+ {
+ return mached_sess_num;
+ }
+ capacity = session_pool_capacity_size(sess_mgr_rt->sess_pool);
+ if (opts->cursor >= capacity)
+ {
+ return mached_sess_num;
+ }
+
+ max_loop = MIN(capacity, opts->cursor + opts->count);
+ for (uint64_t i = opts->cursor; i < max_loop; i++)
+ {
+ sess = session_pool_get0(sess_mgr_rt->sess_pool, i);
+ tuple = session_get_tuple6(sess);
+ if (session_get_current_state(sess) == SESSION_STATE_INIT)
+ {
+ continue;
+ }
+
+ if ((opts->flags & SESSION_SCAN_TYPE) && opts->type != session_get_type(sess))
+ {
+ continue;
+ }
+ if ((opts->flags & SESSION_SCAN_STATE) && opts->state != session_get_current_state(sess))
+ {
+ continue;
+ }
+ if ((opts->flags & SESSION_SCAN_CREATE_TIME) &&
+ (session_get_timestamp(sess, SESSION_TIMESTAMP_START) < opts->create_time_ms[0] ||
+ session_get_timestamp(sess, SESSION_TIMESTAMP_START) > opts->create_time_ms[1]))
+ {
+ continue;
+ }
+ if ((opts->flags & SESSION_SCAN_LAST_PKT_TIME) &&
+ (session_get_timestamp(sess, SESSION_TIMESTAMP_LAST) < opts->last_pkt_time_ms[0] ||
+ session_get_timestamp(sess, SESSION_TIMESTAMP_LAST) > opts->last_pkt_time_ms[1]))
+ {
+ continue;
+ }
+ if ((opts->flags & SESSION_SCAN_SPORT) && opts->src_port != tuple->src_port)
+ {
+ continue;
+ }
+ if ((opts->flags & SESSION_SCAN_DPORT) && opts->dst_port != tuple->dst_port)
+ {
+ continue;
+ }
+ if (opts->flags & SESSION_SCAN_SIP)
+ {
+ if (opts->addr_family != tuple->addr_family)
+ {
+ continue;
+ }
+ if ((opts->addr_family == AF_INET) && !ipv4_in_range(&tuple->src_addr.v4, &opts->src_addr[0].v4, &opts->src_addr[1].v4))
+ {
+ continue;
+ }
+ if ((opts->addr_family == AF_INET6) && !ipv6_in_range(&tuple->src_addr.v6, &opts->src_addr[0].v6, &opts->src_addr[1].v6))
+ {
+ continue;
+ }
+ }
+ if (opts->flags & SESSION_SCAN_DIP)
+ {
+ if (opts->addr_family != tuple->addr_family)
+ {
+ continue;
+ }
+ if ((opts->addr_family == AF_INET) && !ipv4_in_range(&tuple->dst_addr.v4, &opts->dst_addr[0].v4, &opts->dst_addr[1].v4))
+ {
+ continue;
+ }
+ if ((opts->addr_family == AF_INET6) && !ipv6_in_range(&tuple->dst_addr.v6, &opts->dst_addr[0].v6, &opts->dst_addr[1].v6))
+ {
+ continue;
+ }
+ }
+
+ mached_sess_ids[mached_sess_num++] = session_get_id(sess);
+ if (mached_sess_num >= array_size)
+ {
+ break;
+ }
+ }
+
+ SESSION_MANAGER_RUNTIME_LOG_DEBUG("session scan: cursor=%lu, count=%lu, mached_sess_num=%lu", opts->cursor, opts->count, mached_sess_num);
+ return mached_sess_num;
+}
+
+/******************************************************************************
+ * other
+ ******************************************************************************/
+
+void session_set_discard(struct session *sess)
+{
+ struct session_manager_runtime *sess_mgr_rt = sess->sess_mgr_rt;
+ enum session_type type = session_get_type(sess);
+ enum session_state curr_state = session_get_current_state(sess);
+ enum session_state next_state = session_transition_run(curr_state, USER_CLOSE);
+ session_transition_log(sess, curr_state, next_state, USER_CLOSE);
+ session_set_current_state(sess, next_state);
+
+ switch (type)
+ {
+ case SESSION_TYPE_TCP:
+ session_timer_update(sess_mgr_rt->sess_timer, sess, sess_mgr_rt->now_ms + sess_mgr_rt->cfg.tcp_timeout_ms.discard_default);
+ SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, tcp);
+ break;
+ case SESSION_TYPE_UDP:
+ session_timer_update(sess_mgr_rt->sess_timer, sess, sess_mgr_rt->now_ms + sess_mgr_rt->cfg.udp_timeout_ms.discard_default);
+ SESS_MGR_STAT_UPDATE(&sess_mgr_rt->stat, curr_state, next_state, udp);
+ break;
+ default:
+ assert(0);
+ break;
+ }
+} \ No newline at end of file
diff --git a/infra/session_manager/session_manager.h b/infra/session_manager/session_manager_runtime.h
index 910c6a7..910c6a7 100644
--- a/infra/session_manager/session_manager.h
+++ b/infra/session_manager/session_manager_runtime.h
diff --git a/infra/session_manager/session_utils.c b/infra/session_manager/session_utils.c
index 062eb53..83d6888 100644
--- a/infra/session_manager/session_utils.c
+++ b/infra/session_manager/session_utils.c
@@ -1,5 +1,6 @@
+#include "stellar/exdata.h"
#include "session_internal.h"
-#include "session_manager.h"
+#include "session_manager_runtime.h"
void session_init(struct session *sess)
{
@@ -208,6 +209,7 @@ struct tcp_segment *session_get_tcp_segment(struct session *sess)
{
sess->sess_mgr_stat->tcp_segs_consumed++;
half->in_order_ref++;
+ half->in_order.user_data = sess;
return &half->in_order;
}
else
@@ -221,6 +223,7 @@ struct tcp_segment *session_get_tcp_segment(struct session *sess)
// TODO
sess->sess_mgr_stat->tcp_segs_consumed++;
sess->sess_mgr_stat->tcp_segs_reordered++;
+ seg->user_data = sess;
}
return seg;
}
@@ -461,3 +464,15 @@ void session_print(const struct session *sess)
session_to_str(sess, 0, buff, sizeof(buff));
printf("%s\n", buff);
}
+
+void session_set_exdata(struct session *sess, int idx, void *ex_ptr)
+{
+ struct exdata_runtime *rte = (struct exdata_runtime *)session_get_user_data(sess);
+ exdata_set(rte, idx, ex_ptr);
+}
+
+void *session_get_exdata(const struct session *sess, int idx)
+{
+ struct exdata_runtime *rte = (struct exdata_runtime *)session_get_user_data(sess);
+ return exdata_get(rte, idx);
+}
diff --git a/infra/session_manager/test/default_config.h b/infra/session_manager/test/default_config.h
index 729caed..f8ce5fd 100644
--- a/infra/session_manager/test/default_config.h
+++ b/infra/session_manager/test/default_config.h
@@ -7,7 +7,7 @@ extern "C"
{
#endif
-#include "session_manager.h"
+#include "session_manager_runtime.h"
static struct session_manager_config sess_mgr_cfg = {
.session_id_seed = 0xFFFFF,
diff --git a/infra/stellar_core.c b/infra/stellar_core.c
index b3e0471..56131d6 100644
--- a/infra/stellar_core.c
+++ b/infra/stellar_core.c
@@ -17,7 +17,7 @@
#include "stellar_stat.h"
#include "packet_internal.h"
#include "session_internal.h"
-#include "session_manager.h"
+#include "session_manager_runtime.h"
#define CORE_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "core", format, ##__VA_ARGS__)
#define CORE_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "core", format, ##__VA_ARGS__)
diff --git a/infra/stellar_stat.h b/infra/stellar_stat.h
index 3560ce8..7d334f7 100644
--- a/infra/stellar_stat.h
+++ b/infra/stellar_stat.h
@@ -7,7 +7,7 @@ extern "C"
#include "packet_io.h"
#include "ip_reassembly.h"
-#include "session_manager.h"
+#include "session_manager_runtime.h"
struct thread_stat
{
diff --git a/infra/tcp_reassembly/tcp_reassembly.c b/infra/tcp_reassembly/tcp_reassembly.c
index 2591ab2..3eb7f18 100644
--- a/infra/tcp_reassembly/tcp_reassembly.c
+++ b/infra/tcp_reassembly/tcp_reassembly.c
@@ -255,28 +255,4 @@ uint32_t tcp_reassembly_get_recv_next(struct tcp_reassembly *tcp_reass)
}
return tcp_reass->recv_next;
-}
-
-const char *tcp_segment_get_data(const struct tcp_segment *seg)
-{
- if (seg == NULL)
- {
- return NULL;
- }
- else
- {
- return (const char *)seg->data;
- }
-}
-
-uint16_t tcp_segment_get_len(const struct tcp_segment *seg)
-{
- if (seg == NULL)
- {
- return 0;
- }
- else
- {
- return seg->len;
- }
} \ No newline at end of file
diff --git a/infra/tcp_reassembly/tcp_reassembly.h b/infra/tcp_reassembly/tcp_reassembly.h
index 50e369b..87660c4 100644
--- a/infra/tcp_reassembly/tcp_reassembly.h
+++ b/infra/tcp_reassembly/tcp_reassembly.h
@@ -12,6 +12,7 @@ struct tcp_segment
{
uint32_t len;
const void *data;
+ void *user_data;
};
struct tcp_segment *tcp_segment_new(uint32_t seq, const void *data, uint32_t len);
diff --git a/infra/version.map b/infra/version.map
index ea3336d..f241d50 100644
--- a/infra/version.map
+++ b/infra/version.map
@@ -18,9 +18,6 @@ global:
packet_build_udp;
packet_build_l3;
- tcp_segment_get_data;
- tcp_segment_get_len;
-
exdata_*;
mq_*;
stellar_module_*;
@@ -57,6 +54,9 @@ global:
log_print;
log_check_level;
+ session_manager_module_on_init;
+ session_manager_module_on_exit;
+
http_message_*;
http_decoder_init;
http_decoder_exit;