summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt1
-rw-r--r--conf/stellar.toml1
-rw-r--r--decoders/CMakeLists.txt2
-rw-r--r--include/stellar/packet.h2
-rw-r--r--include/stellar/session.h11
-rw-r--r--infra/CMakeLists.txt2
-rw-r--r--infra/packet_io/packet_io.c2
-rw-r--r--infra/packet_io/pcap_io.c28
-rw-r--r--infra/packet_io/test/CMakeLists.txt3
-rw-r--r--infra/packet_io/test/conf/pcap_io.toml1
-rw-r--r--infra/packet_manager/packet_manager.c25
-rw-r--r--infra/packet_manager/test/CMakeLists.txt3
-rw-r--r--infra/session_manager/session_internal.h3
-rw-r--r--infra/session_manager/session_manager.c414
-rw-r--r--infra/session_manager/session_manager_rte.c51
-rw-r--r--infra/session_manager/session_manager_rte.h1
-rw-r--r--infra/session_manager/session_utils.c11
-rw-r--r--infra/session_manager/test/CMakeLists.txt4
-rw-r--r--infra/stellar_core.c2
-rw-r--r--infra/tcp_reassembly/tcp_reassembly.h1
-rw-r--r--infra/utils_internal.h2
-rw-r--r--infra/version.map8
-rw-r--r--scripts/stat_format.sh1
-rw-r--r--test/CMakeLists.txt4
-rw-r--r--test/monitor/CMakeLists.txt3
25 files changed, 209 insertions, 377 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 9d28378..cb3d02a 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -89,4 +89,5 @@ add_subdirectory(tools)
add_subdirectory(test)
install(DIRECTORY DESTINATION log COMPONENT PROGRAM)
+install(DIRECTORY DESTINATION metrics COMPONENT PROGRAM)
install(DIRECTORY DESTINATION module COMPONENT PROGRAM) \ No newline at end of file
diff --git a/conf/stellar.toml b/conf/stellar.toml
index fdd7ba9..20a8b91 100644
--- a/conf/stellar.toml
+++ b/conf/stellar.toml
@@ -7,6 +7,7 @@
dev_symbol = "nf_0_fw"
pcap_path = "/tmp/test.pcap"
pcap_done_exit = 1 # range: [0, 1]
+ pcap_queue_size = 1 # range: [1, 4294967295]
thread_num = 1 # range: [1, 256]
cpu_mask = [5, 6, 7, 8, 9, 10, 11, 12]
idle_yield_ms = 900 # range: [0, 60000] (ms)
diff --git a/decoders/CMakeLists.txt b/decoders/CMakeLists.txt
index 7946822..0875131 100644
--- a/decoders/CMakeLists.txt
+++ b/decoders/CMakeLists.txt
@@ -1,4 +1,4 @@
-add_subdirectory(lpi_plus)
+#add_subdirectory(lpi_plus)
#add_subdirectory(http)
#add_subdirectory(socks)
#add_subdirectory(stratum)
diff --git a/include/stellar/packet.h b/include/stellar/packet.h
index c20e1b5..ec49227 100644
--- a/include/stellar/packet.h
+++ b/include/stellar/packet.h
@@ -227,6 +227,8 @@ struct packet *packet_manager_build_udp_packet(struct packet_manager *pkt_mgr, c
const char *udp_payload, uint16_t udp_payload_len);
struct packet *packet_manager_build_l3_packet(struct packet_manager *pkt_mgr, const struct packet *origin_pkt,
uint8_t ip_proto, const char *l3_payload, uint16_t l3_payload_len);
+struct packet *packet_manager_dup_packet(struct packet_manager *pkt_mgr, const struct packet *origin_pkt);
+void packet_manager_free_packet(struct packet_manager *pkt_mgr, struct packet *pkt);
#ifdef __cplusplus
}
diff --git a/include/stellar/session.h b/include/stellar/session.h
index 68a69c1..a5dae67 100644
--- a/include/stellar/session.h
+++ b/include/stellar/session.h
@@ -130,7 +130,6 @@ int session_has_duplicate_traffic(const struct session *sess);
enum session_type session_get_type(const struct session *sess);
enum session_state session_get_current_state(const struct session *sess);
-const struct packet *session_get_current_packet(const struct session *sess);
enum closing_reason session_get_closing_reason(const struct session *sess);
enum session_direction session_get_direction(const struct session *sess);
@@ -152,15 +151,7 @@ struct session_manager;
struct session_manager *module_to_session_manager(struct module *mod);
int session_manager_new_session_exdata_index(struct session_manager *sess_mgr, const char *name, exdata_free *func, void *arg);
-// When the state is SESSION_STATE_CLOSED, the packet is NULL, and the session will be destroyed.
-typedef void on_session_message_callback(struct session *sess, enum session_state state, struct packet *pkt, void *args);
-// When the state is SESSION_STATE_CLOSED, the tcp_payload is NULL, and the session will be destroyed.
-typedef void on_tcp_payload_callback(struct session *sess, enum session_state state, const char *tcp_payload, uint32_t tcp_payload_len, void *args);
-
-int session_manager_subscribe_tcp(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args);
-int session_manager_subscribe_udp(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args);
-int session_manager_subscribe_control_packet(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args);
-int session_manager_subscribe_tcp_stream(struct session_manager *sess_mgr, on_tcp_payload_callback *cb, void *args);
+struct session *packet_exdata_to_session(struct packet *pkt);
#ifdef __cplusplus
}
diff --git a/infra/CMakeLists.txt b/infra/CMakeLists.txt
index fc0ad21..e76a214 100644
--- a/infra/CMakeLists.txt
+++ b/infra/CMakeLists.txt
@@ -1,6 +1,6 @@
set(INFRA exdata mq tuple packet_manager packet_io ip_reassembly tcp_reassembly session_manager module_manager monitor)
set(DEPS bitmap dablooms interval_tree logger nmx_pool rbtree timeout toml ringbuf)
-set(DECODERS lpi_plus)
+#set(DECODERS lpi_plus)
set(WHOLE_ARCHIVE ${DEPS} ${INFRA} ${DECODERS})
set(LIBS fieldstat4)
diff --git a/infra/packet_io/packet_io.c b/infra/packet_io/packet_io.c
index e0a7e1c..533ce8b 100644
--- a/infra/packet_io/packet_io.c
+++ b/infra/packet_io/packet_io.c
@@ -161,7 +161,7 @@ struct packet_io *packet_io_new(const char *toml_file)
PACKET_IO_LOG_ERROR("failed to create fieldstat_easy");
goto error_out;
}
- if (fieldstat_easy_enable_auto_output(pkt_io->fs, "packet_io.fs4", 2) != 0)
+ if (fieldstat_easy_enable_auto_output(pkt_io->fs, "metrics/packet_io.json", 2) != 0)
{
PACKET_IO_LOG_ERROR("failed to enable auto output for fieldstat_easy");
goto error_out;
diff --git a/infra/packet_io/pcap_io.c b/infra/packet_io/pcap_io.c
index 555b59a..1fc9c6d 100644
--- a/infra/packet_io/pcap_io.c
+++ b/infra/packet_io/pcap_io.c
@@ -3,6 +3,7 @@
#include <limits.h>
#include <pthread.h>
#include <pcap/pcap.h>
+#include <sys/prctl.h>
#include "pcap_io.h"
#include "packet_dump.h"
@@ -11,8 +12,6 @@
#include "packet_internal.h"
#include "utils_internal.h"
-#define RING_BUFFER_MAX_SIZE (4096 * 1000)
-
struct pcap_pkt
{
char *data;
@@ -24,8 +23,9 @@ struct pcap_io_cfg
{
char mode[16]; // pcapfile, pcaplist
char pcap_path[PATH_MAX];
- uint64_t pcap_done_exit; // range [0, 1]
- uint64_t thread_num; // range [1, MAX_THREAD_NUM]
+ uint64_t pcap_done_exit; // range [0, 1]
+ uint64_t pcap_queue_size; // range [1, 4294967295]
+ uint64_t thread_num; // range [1, MAX_THREAD_NUM]
// packet pool
uint64_t capacity; // range: [1, 4294967295]
@@ -140,6 +140,7 @@ static struct pcap_io_cfg *pcap_io_cfg_new(const char *toml_file)
ret += load_toml_str_config(toml_file, "packet_io.mode", cfg->mode);
ret += load_toml_str_config(toml_file, "packet_io.pcap_path", cfg->pcap_path);
ret += load_toml_integer_config(toml_file, "packet_io.pcap_done_exit", &cfg->pcap_done_exit, 0, 1);
+ ret += load_toml_integer_config(toml_file, "packet_io.pcap_queue_size", &cfg->pcap_queue_size, 1, 4294967295);
ret += load_toml_integer_config(toml_file, "packet_io.thread_num", &cfg->thread_num, 1, MAX_THREAD_NUM);
ret += load_toml_integer_config(toml_file, "packet_io.packet_pool.capacity", &cfg->capacity, 1, 4294967295);
if (strcmp(cfg->mode, "pcapfile") != 0 && strcmp(cfg->mode, "pcaplist") != 0)
@@ -249,12 +250,23 @@ static int pcap_io_handler(struct pcap_io *pcap_io, const char *pcap_file)
static int all_packet_consumed(struct pcap_io *pcap_io)
{
uint64_t consumed_pkts = 0;
+ uint64_t total_tx_pkts = 0;
+ uint64_t total_dropped_pkts = 0;
+ uint64_t total_injected_pkts = 0;
+ uint64_t total_user_freed_pkts = 0;
+
uint64_t read_pcap_pkts = ATOMIC_READ(&pcap_io->read_pcap_pkts);
for (uint16_t i = 0; i < pcap_io->cfg->thread_num; i++)
{
- consumed_pkts += ATOMIC_READ(&pcap_io->stat[i].pkts_rx);
+ total_tx_pkts += ATOMIC_READ(&pcap_io->stat[i].pkts_tx);
+ total_dropped_pkts += ATOMIC_READ(&pcap_io->stat[i].pkts_dropped);
+ total_injected_pkts += ATOMIC_READ(&pcap_io->stat[i].pkts_injected);
+ total_user_freed_pkts += ATOMIC_READ(&pcap_io->stat[i].pkts_user_freed);
}
- if (consumed_pkts == read_pcap_pkts)
+
+ consumed_pkts = total_tx_pkts + total_dropped_pkts + total_user_freed_pkts - total_injected_pkts;
+
+ if (consumed_pkts >= read_pcap_pkts)
{
return 1;
}
@@ -269,6 +281,8 @@ static void *pcap_io_thread(void *arg)
struct pcap_io *pcap_io = (struct pcap_io *)arg;
__thread_local_logger = pcap_io->logger;
+ prctl(PR_SET_NAME, "stellar:pcap", NULL, NULL, NULL);
+
ATOMIC_SET(&pcap_io->io_thread_is_runing, 1);
PACKET_IO_LOG_FATAL("pcap io thread is running");
@@ -373,7 +387,7 @@ void *pcap_io_new(const char *toml_file)
for (uint16_t i = 0; i < pcap_io->cfg->thread_num; i++)
{
- pcap_io->ring[i] = ring_buffer_new(RING_BUFFER_MAX_SIZE);
+ pcap_io->ring[i] = ring_buffer_new(pcap_io->cfg->pcap_queue_size);
if (pcap_io->ring[i] == NULL)
{
PACKET_IO_LOG_ERROR("unable to create ring buffer");
diff --git a/infra/packet_io/test/CMakeLists.txt b/infra/packet_io/test/CMakeLists.txt
index 740dd40..be675ac 100644
--- a/infra/packet_io/test/CMakeLists.txt
+++ b/infra/packet_io/test/CMakeLists.txt
@@ -5,4 +5,5 @@ include(GoogleTest)
gtest_discover_tests(gtest_packet_io)
file(COPY ./conf/ DESTINATION ./conf/)
-file(COPY ./pcap/ DESTINATION ./pcap/) \ No newline at end of file
+file(COPY ./pcap/ DESTINATION ./pcap/)
+file(COPY ./metrics/ DESTINATION ./metrics/) \ No newline at end of file
diff --git a/infra/packet_io/test/conf/pcap_io.toml b/infra/packet_io/test/conf/pcap_io.toml
index 3bf7cfc..35943f7 100644
--- a/infra/packet_io/test/conf/pcap_io.toml
+++ b/infra/packet_io/test/conf/pcap_io.toml
@@ -4,6 +4,7 @@
dev_symbol = "nf_0_fw"
pcap_path = "./pcap/IPv4_frags_UDP.pcap"
pcap_done_exit = 1 # range: [0, 1]
+ pcap_queue_size = 1024 # range: [1, 4294967295]
thread_num = 1 # range: [1, 256]
cpu_mask = [5, 6, 7, 8, 9, 10, 11, 12]
idle_yield_ms = 900 # range: [0, 60000] (ms)
diff --git a/infra/packet_manager/packet_manager.c b/infra/packet_manager/packet_manager.c
index bb4a0c1..e776a85 100644
--- a/infra/packet_manager/packet_manager.c
+++ b/infra/packet_manager/packet_manager.c
@@ -227,7 +227,7 @@ struct packet_manager *packet_manager_new(struct mq_schema *mq_sche, uint16_t th
PACKET_MANAGER_LOG_ERROR("failed to create fieldstat_easy");
goto error_out;
}
- if (fieldstat_easy_enable_auto_output(pkt_mgr->fs, "packet_manager.fs4", 2) != 0)
+ if (fieldstat_easy_enable_auto_output(pkt_mgr->fs, "metrics/packet_manager.json", 2) != 0)
{
PACKET_MANAGER_LOG_ERROR("failed to enable auto output for fieldstat_easy");
goto error_out;
@@ -477,6 +477,20 @@ struct packet *packet_manager_build_l3_packet(struct packet_manager *pkt_mgr, co
return pkt;
}
+struct packet *packet_manager_dup_packet(struct packet_manager *pkt_mgr, const struct packet *origin_pkt)
+{
+ struct packet *pkt = packet_dup(origin_pkt);
+ if (pkt == NULL)
+ {
+ return NULL;
+ }
+
+ struct exdata_runtime *ex_rte = exdata_runtime_new(pkt_mgr->sche->ex_sche);
+ packet_set_user_data(pkt, ex_rte);
+
+ return pkt;
+}
+
void packet_manager_free_packet(struct packet_manager *pkt_mgr __attribute__((unused)), struct packet *pkt)
{
if (pkt)
@@ -581,8 +595,9 @@ struct module *packet_manager_on_thread_init(struct module_manager *mod_mgr, int
void packet_manager_on_thread_exit(struct module_manager *mod_mgr __attribute__((unused)), int thread_id, struct module *mod)
{
struct packet_manager *pkt_mgr = module_get_ctx(mod);
- assert(pkt_mgr);
- assert(thread_id < pkt_mgr->thread_num);
-
- packet_manager_clean(pkt_mgr, thread_id);
+ if (pkt_mgr)
+ {
+ assert(thread_id < pkt_mgr->thread_num);
+ packet_manager_clean(pkt_mgr, thread_id);
+ }
} \ No newline at end of file
diff --git a/infra/packet_manager/test/CMakeLists.txt b/infra/packet_manager/test/CMakeLists.txt
index 184d9fc..916e1d5 100644
--- a/infra/packet_manager/test/CMakeLists.txt
+++ b/infra/packet_manager/test/CMakeLists.txt
@@ -84,4 +84,5 @@ gtest_discover_tests(gtest_packet_ldbc)
gtest_discover_tests(gtest_packet_pool)
gtest_discover_tests(gtest_packet_manager)
-file(COPY ../../../conf/ DESTINATION ./conf/) \ No newline at end of file
+file(COPY ../../../conf/ DESTINATION ./conf/)
+file(COPY ./metrics/ DESTINATION ./metrics/) \ No newline at end of file
diff --git a/infra/session_manager/session_internal.h b/infra/session_manager/session_internal.h
index 2201315..ec8a0b8 100644
--- a/infra/session_manager/session_internal.h
+++ b/infra/session_manager/session_internal.h
@@ -48,7 +48,6 @@ struct session
uint64_t timestamps[MAX_TIMESTAMP]; // realtime msec
struct tcp_half tcp_halfs[MAX_FLOW_TYPE];
struct timeout timeout;
- struct tcp_segment empty_seg;
TAILQ_ENTRY(session) lru_tqe;
TAILQ_ENTRY(session) free_tqe;
TAILQ_ENTRY(session) evc_tqe;
@@ -120,7 +119,7 @@ void session_set_first_packet(struct session *sess, enum flow_type type, const s
// const struct packet *session_get_first_packet(const struct session *sess, enum flow_type type);
void session_set_current_packet(struct session *sess, const struct packet *pkt);
-// const struct packet *session_get_current_packet(const struct session *sess);
+const struct packet *session_get_current_packet(const struct session *sess);
// int session_is_symmetric(const struct session *sess, unsigned char *flag);
diff --git a/infra/session_manager/session_manager.c b/infra/session_manager/session_manager.c
index 4358a4d..96d421c 100644
--- a/infra/session_manager/session_manager.c
+++ b/infra/session_manager/session_manager.c
@@ -2,6 +2,7 @@
#include "utils_internal.h"
#include "session_internal.h"
+#include "session_manager.h"
#include "session_manager_log.h"
#include "session_manager_cfg.h"
#include "session_manager_rte.h"
@@ -12,89 +13,36 @@
#pragma GCC diagnostic ignored "-Wunused-parameter"
-struct session_manager_sche
+struct session_manager
{
- int pkt_ex_id;
- int sess_msg_id_tcp;
- int sess_msg_id_udp;
- int sess_msg_id_ctrl;
- int sess_msg_id_stream;
-
- struct mq_schema *mq_sche;
+ int pkt_ex_to_get_sess;
+ int pkt_ex_to_free_sess;
struct exdata_schema *ex_sche;
-};
-struct session_manager
-{
int stat_idx[SESS_MGR_STAT_MAX];
struct fieldstat_easy *fs;
struct session_manager_cfg *cfg;
- struct session_manager_sche *sche;
struct session_manager_rte *rte[MAX_THREAD_NUM];
- struct mq_runtime *mq[MAX_THREAD_NUM];
struct module_manager *mod_mgr;
+ struct packet_manager *pkt_mgr;
};
+__thread int __thread_pkt_ex_to_get_sess = 0;
+
/******************************************************************************
* session manager sche
******************************************************************************/
-static void clean_closed_session(struct session_manager *sess_mgr, uint16_t thread_id, uint64_t now_ms)
-{
- struct session *sess = NULL;
- struct session *cleaned[CLEAN_SESSION_BURST] = {NULL};
- struct mq_runtime *mq_rte = sess_mgr->mq[thread_id];
- struct session_manager_rte *sess_mgr_rte = sess_mgr->rte[thread_id];
-
- uint64_t used = session_manager_rte_clean_session(sess_mgr_rte, now_ms, cleaned, CLEAN_SESSION_BURST);
- for (uint64_t i = 0; i < used; i++)
- {
- sess = cleaned[i];
- assert(session_get_current_state(sess) == SESSION_STATE_CLOSED);
- session_set_current_packet(sess, NULL);
- session_set_flow_type(sess, FLOW_TYPE_NONE);
-
- if (session_get_type(sess) == SESSION_TYPE_TCP)
- {
- mq_runtime_publish_message(mq_rte, sess_mgr->sche->sess_msg_id_stream, &sess->empty_seg);
- mq_runtime_publish_message(mq_rte, sess_mgr->sche->sess_msg_id_ctrl, sess);
- mq_runtime_publish_message(mq_rte, sess_mgr->sche->sess_msg_id_tcp, sess);
- }
- else
- {
- mq_runtime_publish_message(mq_rte, sess_mgr->sche->sess_msg_id_ctrl, sess);
- mq_runtime_publish_message(mq_rte, sess_mgr->sche->sess_msg_id_udp, sess);
- }
- }
-}
-
-static void on_sess_msg_dispatch(int sess_msg_id, void *msg, on_msg_cb_func *msg_cb, void *msg_cb_args, void *dispatch_args)
+static void free_session(int idx, void *ex_ptr, void *arg)
{
- struct session *sess = (struct session *)msg;
- struct packet *pkt = (struct packet *)session_get_current_packet(sess);
- enum session_state state = session_get_current_state(sess);
+ struct session *sess = (struct session *)ex_ptr;
+ struct session_manager *sess_mgr = (struct session_manager *)arg;
+ assert(idx == sess_mgr->pkt_ex_to_free_sess);
- ((on_session_message_callback *)(void *)msg_cb)(sess, state, pkt, msg_cb_args);
-}
-
-static void on_tcp_payload_msg_dispatch(int sess_msg_id, void *msg, on_msg_cb_func *msg_cb, void *msg_cb_args, void *dispatch_args)
-{
- struct tcp_segment *seg = (struct tcp_segment *)msg;
- struct session *sess = (struct session *)seg->user_data;
- enum session_state state = session_get_current_state(sess);
-
- ((on_tcp_payload_callback *)(void *)msg_cb)(sess, state, seg->data, seg->len, msg_cb_args);
-}
-
-static void on_sess_msg_free(void *msg, void *args)
-{
- struct session *sess = (struct session *)msg;
-
- if (session_get_current_state(sess) == SESSION_STATE_CLOSED)
+ if (sess && session_get_current_state(sess) == SESSION_STATE_CLOSED)
{
- struct session_manager *sess_mgr = (struct session_manager *)args;
int thread_id = module_manager_get_thread_id(sess_mgr->mod_mgr);
- struct session_manager_rte *sess_mgr_rte = sess_mgr->rte[thread_id];
+ struct session_manager_rte *sess_mgr_rte = session_manager_get_rte(sess_mgr, thread_id);
char buffer[4096] = {0};
session_to_str(sess, 0, buffer, sizeof(buffer));
@@ -106,20 +54,34 @@ static void on_sess_msg_free(void *msg, void *args)
}
}
-static void on_tcp_payload_msg_free(void *msg, void *args)
+static void notify_sess_closed_by_pseudo_pkt(struct session_manager *sess_mgr, int thread_id, struct session *sess)
{
- struct tcp_segment *seg = (struct tcp_segment *)msg;
- struct session *sess = (struct session *)seg->user_data;
+ struct packet *pseudo = NULL;
+ struct packet_manager *pkt_mgr = sess_mgr->pkt_mgr;
+ assert(session_get_current_state(sess) == SESSION_STATE_CLOSED);
- session_free_tcp_segment(sess, seg);
+ if (session_get_first_packet(sess, FLOW_TYPE_C2S))
+ {
+ pseudo = packet_manager_dup_packet(pkt_mgr, session_get_first_packet(sess, FLOW_TYPE_C2S));
+ }
+ else
+ {
+ pseudo = packet_manager_dup_packet(pkt_mgr, session_get_first_packet(sess, FLOW_TYPE_S2C));
+ }
+
+ assert(pseudo);
+ packet_set_type(pseudo, PACKET_TYPE_PSEUDO);
+ packet_set_action(pseudo, PACKET_ACTION_DROP);
+ packet_set_exdata(pseudo, sess_mgr->pkt_ex_to_free_sess, sess);
+ packet_manager_schedule_packet(pkt_mgr, thread_id, pseudo, PACKET_STAGE_FORWARD);
+ SESSION_MANAGER_LOG_INFO("notify session %lu %s closed by pseudo packet: %p", session_get_id(sess), session_get_readable_addr(sess), pseudo);
}
static void on_packet_forward(struct packet *pkt, enum packet_stage stage, void *args)
{
struct session_manager *sess_mgr = (struct session_manager *)args;
int thread_id = module_manager_get_thread_id(sess_mgr->mod_mgr);
- struct mq_runtime *mq_rte = sess_mgr->mq[thread_id];
- struct session_manager_rte *sess_mgr_rte = sess_mgr->rte[thread_id];
+ struct session_manager_rte *sess_mgr_rte = session_manager_get_rte(sess_mgr, thread_id);
/*
* We use the system's real time instead of monotonic time for the following reasons:
@@ -132,86 +94,62 @@ static void on_packet_forward(struct packet *pkt, enum packet_stage stage, void
uint64_t now_ms = clock_get_real_time_ms();
struct tuple6 key;
- struct tcp_segment *seg = NULL;
struct session *sess = session_manager_rte_lookup_session_by_packet(sess_mgr_rte, pkt);
if (sess == NULL)
{
- if (packet_get_type(pkt) == PACKET_TYPE_PSEUDO)
- {
- goto fast_path;
- }
-
- sess = session_manager_rte_new_session(sess_mgr_rte, pkt, now_ms);
- if (sess == NULL)
+ if (packet_get_type(pkt) == PACKET_TYPE_RAW)
{
- goto fast_path;
+ sess = session_manager_rte_new_session(sess_mgr_rte, pkt, now_ms);
+ if (sess)
+ {
+ session_set_user_data(sess, exdata_runtime_new(sess_mgr->ex_sche));
+ }
}
else
{
- session_set_user_data(sess, exdata_runtime_new(sess_mgr->sche->ex_sche));
- goto slow_path;
+ // TODO new session by pseudo packet
}
}
else
{
if (packet_get_type(pkt) == PACKET_TYPE_PSEUDO)
{
- goto ctrl_path;
- }
-
- if (session_manager_rte_update_session(sess_mgr_rte, sess, pkt, now_ms) == -1)
- {
- goto fast_path;
+ session_set_current_packet(sess, pkt);
+ packet_get_innermost_tuple6(pkt, &key);
+ if (tuple6_cmp(session_get_tuple6(sess), &key) == 0)
+ {
+ session_set_flow_type(sess, FLOW_TYPE_C2S);
+ sess->stats[FLOW_TYPE_C2S][STAT_PSEUDO_PACKETS_RECEIVED]++;
+ sess->stats[FLOW_TYPE_C2S][STAT_PSEUDO_BYTES_RECEIVED] += packet_get_raw_len(pkt);
+ }
+ else
+ {
+ session_set_flow_type(sess, FLOW_TYPE_S2C);
+ sess->stats[FLOW_TYPE_S2C][STAT_PSEUDO_PACKETS_RECEIVED]++;
+ sess->stats[FLOW_TYPE_S2C][STAT_PSEUDO_BYTES_RECEIVED] += packet_get_raw_len(pkt);
+ }
}
else
{
- goto slow_path;
+ session_manager_rte_update_session(sess_mgr_rte, sess, pkt, now_ms);
}
}
-ctrl_path:
- session_set_current_packet(sess, pkt);
- packet_get_innermost_tuple6(pkt, &key);
- if (tuple6_cmp(session_get_tuple6(sess), &key) == 0)
- {
- session_set_flow_type(sess, FLOW_TYPE_C2S);
- }
- else
- {
- session_set_flow_type(sess, FLOW_TYPE_S2C);
- }
- packet_set_exdata(pkt, sess_mgr->sche->pkt_ex_id, sess);
- mq_runtime_publish_message(mq_rte, sess_mgr->sche->sess_msg_id_ctrl, sess);
- return;
+ packet_set_exdata(pkt, sess_mgr->pkt_ex_to_get_sess, sess);
-slow_path:
- if (session_get_type(sess) == SESSION_TYPE_TCP)
+ while ((sess = session_manager_rte_get_evicted_session(sess_mgr_rte)))
{
- mq_runtime_publish_message(mq_rte, sess_mgr->sche->sess_msg_id_tcp, sess);
- while ((seg = session_get_tcp_segment(sess)))
- {
- mq_runtime_publish_message(mq_rte, sess_mgr->sche->sess_msg_id_stream, seg);
- }
+ notify_sess_closed_by_pseudo_pkt(sess_mgr, thread_id, sess);
}
- else
- {
- mq_runtime_publish_message(mq_rte, sess_mgr->sche->sess_msg_id_udp, sess);
- }
- packet_set_exdata(pkt, sess_mgr->sche->pkt_ex_id, sess);
- return;
-
-fast_path:
- packet_set_exdata(pkt, sess_mgr->sche->pkt_ex_id, NULL);
- return;
}
static void on_packet_output(struct packet *pkt, enum packet_stage stage, void *args)
{
struct session_manager *sess_mgr = (struct session_manager *)args;
int thread_id = module_manager_get_thread_id(sess_mgr->mod_mgr);
- struct session_manager_rte *sess_mgr_rte = sess_mgr->rte[thread_id];
+ struct session_manager_rte *sess_mgr_rte = session_manager_get_rte(sess_mgr, thread_id);
- struct session *sess = (struct session *)packet_get_exdata(pkt, sess_mgr->sche->pkt_ex_id);
+ struct session *sess = (struct session *)packet_get_exdata(pkt, sess_mgr->pkt_ex_to_get_sess);
if (sess)
{
struct tuple6 key;
@@ -255,18 +193,35 @@ static void on_packet_output(struct packet *pkt, enum packet_stage stage, void *
static void on_polling(struct module_manager *mod_mgr, void *args)
{
+ struct session *sess = NULL;
uint64_t now_ms = clock_get_real_time_ms();
struct session_manager *sess_mgr = (struct session_manager *)args;
int thread_id = module_manager_get_thread_id(mod_mgr);
- struct session_manager_rte *sess_mgr_rte = sess_mgr->rte[thread_id];
- struct session_manager_stat *sess_mgr_stat = session_manager_rte_get_stat(sess_mgr_rte);
+ struct session_manager_rte *sess_mgr_rte = session_manager_get_rte(sess_mgr, thread_id);
- clean_closed_session(sess_mgr, thread_id, now_ms);
+ static __thread uint64_t last_clean_expired_sess_ts = 0;
+ if (now_ms - last_clean_expired_sess_ts > sess_mgr->cfg->expire_period_ms)
+ {
+ for (uint64_t i = 0; i < sess_mgr->cfg->expire_batch_max; i++)
+ {
+ sess = session_manager_rte_get_expired_session(sess_mgr_rte, now_ms);
+ if (sess)
+ {
+ notify_sess_closed_by_pseudo_pkt(sess_mgr, thread_id, sess);
+ }
+ else
+ {
+ break;
+ }
+ }
+ last_clean_expired_sess_ts = now_ms;
+ }
static __thread uint64_t last_sync_stat_ms = 0;
static __thread struct session_manager_stat last_stat = {0};
if (now_ms - last_sync_stat_ms >= SYNC_STAT_INTERVAL_MS)
{
+ struct session_manager_stat *sess_mgr_stat = session_manager_rte_get_stat(sess_mgr_rte);
for (int i = 0; i < SESS_MGR_STAT_MAX; i++)
{
uint64_t val = session_manager_stat_get(sess_mgr_stat, i) - session_manager_stat_get(&last_stat, i);
@@ -277,100 +232,6 @@ static void on_polling(struct module_manager *mod_mgr, void *args)
}
}
-static void session_manager_sche_free(struct session_manager_sche *sess_mgr_schema)
-{
- if (sess_mgr_schema)
- {
- if (sess_mgr_schema->mq_sche)
- {
- mq_schema_destroy_topic(sess_mgr_schema->mq_sche, sess_mgr_schema->sess_msg_id_tcp);
- mq_schema_destroy_topic(sess_mgr_schema->mq_sche, sess_mgr_schema->sess_msg_id_udp);
- mq_schema_destroy_topic(sess_mgr_schema->mq_sche, sess_mgr_schema->sess_msg_id_ctrl);
- mq_schema_destroy_topic(sess_mgr_schema->mq_sche, sess_mgr_schema->sess_msg_id_stream);
- }
- exdata_schema_free(sess_mgr_schema->ex_sche);
-
- free(sess_mgr_schema);
- sess_mgr_schema = NULL;
- }
-}
-
-static struct session_manager_sche *session_manager_sche_new(struct packet_manager *pkt_mgr, struct mq_schema *mq_sche, void *sess_mgr)
-{
- if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_FORWARD, on_packet_forward, sess_mgr))
- {
- SESSION_MANAGER_LOG_ERROR("failed to subscribe PACKET_STAGE_FORWARD");
- return NULL;
- }
- if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_OUTPUT, on_packet_output, sess_mgr))
- {
- SESSION_MANAGER_LOG_ERROR("failed to subscribe PACKET_STAGE_OUTPUT");
- return NULL;
- }
-
- struct session_manager_sche *sess_mgr_schema = calloc(1, sizeof(struct session_manager_sche));
- if (sess_mgr_schema == NULL)
- {
- SESSION_MANAGER_LOG_ERROR("failed to allocate memory for session_manager_sche");
- return NULL;
- }
-
- sess_mgr_schema->ex_sche = exdata_schema_new();
- if (sess_mgr_schema->ex_sche == NULL)
- {
- SESSION_MANAGER_LOG_ERROR("failed to create exdata_schema");
- goto error_out;
- }
-
- sess_mgr_schema->mq_sche = mq_sche;
- sess_mgr_schema->pkt_ex_id = packet_manager_new_packet_exdata_index(pkt_mgr, "session_manager", NULL, NULL);
- if (sess_mgr_schema->pkt_ex_id == -1)
- {
- SESSION_MANAGER_LOG_ERROR("failed to create packet exdata index");
- goto error_out;
- }
-
- /*
- * Publish session closed messages to multiple topics.
- * Each topic has its own session message free callback.
- * To prevent the same session from being freeed multiple times,
- * only TCP/UDP topics register session message free callbacks,
- * and other topics do not register session message callbacks;
- *
- * Restriction: MQ ensures that the session message free order is consistent with the publishing order
- */
- sess_mgr_schema->sess_msg_id_tcp = mq_schema_create_topic(sess_mgr_schema->mq_sche, "SESSION_MESSAGE_TCP", &on_sess_msg_dispatch, NULL, &on_sess_msg_free, sess_mgr);
- if (sess_mgr_schema->sess_msg_id_tcp == -1)
- {
- SESSION_MANAGER_LOG_ERROR("failed to create topic SESSION_MESSAGE_FREE");
- goto error_out;
- }
- sess_mgr_schema->sess_msg_id_udp = mq_schema_create_topic(sess_mgr_schema->mq_sche, "SESSION_MESSAGE_UDP", &on_sess_msg_dispatch, NULL, &on_sess_msg_free, sess_mgr);
- if (sess_mgr_schema->sess_msg_id_udp == -1)
- {
- SESSION_MANAGER_LOG_ERROR("failed to create topic SESSION_MESSAGE_UDP");
- goto error_out;
- }
- sess_mgr_schema->sess_msg_id_ctrl = mq_schema_create_topic(sess_mgr_schema->mq_sche, "SESSION_MESSAGE_CTRL_PKT", &on_sess_msg_dispatch, NULL, NULL, NULL);
- if (sess_mgr_schema->sess_msg_id_ctrl == -1)
- {
- SESSION_MANAGER_LOG_ERROR("failed to create topic SESSION_MESSAGE_CTRL_PKT");
- goto error_out;
- }
- sess_mgr_schema->sess_msg_id_stream = mq_schema_create_topic(sess_mgr_schema->mq_sche, "SESSION_MESSAGE_TCP_STREAM", &on_tcp_payload_msg_dispatch, NULL, &on_tcp_payload_msg_free, sess_mgr);
- if (sess_mgr_schema->sess_msg_id_stream == -1)
- {
- SESSION_MANAGER_LOG_ERROR("failed to create topic SESSION_MESSAGE_TCP_STREAM");
- goto error_out;
- }
-
- return sess_mgr_schema;
-
-error_out:
- session_manager_sche_free(sess_mgr_schema);
- return NULL;
-}
-
/******************************************************************************
* session manager
******************************************************************************/
@@ -379,9 +240,9 @@ void session_manager_free(struct session_manager *sess_mgr)
{
if (sess_mgr)
{
- if (sess_mgr->sche)
+ if (sess_mgr->ex_sche)
{
- session_manager_sche_free(sess_mgr->sche);
+ exdata_schema_free(sess_mgr->ex_sche);
}
if (sess_mgr->fs)
{
@@ -395,7 +256,7 @@ void session_manager_free(struct session_manager *sess_mgr)
}
}
-static struct session_manager *session_manager_new(struct packet_manager *pkt_mgr, struct mq_schema *mq_schema, const char *toml_file)
+static struct session_manager *session_manager_new(struct packet_manager *pkt_mgr, const char *toml_file)
{
struct session_manager *sess_mgr = calloc(1, sizeof(struct session_manager));
if (sess_mgr == NULL)
@@ -422,18 +283,45 @@ static struct session_manager *session_manager_new(struct packet_manager *pkt_mg
{
sess_mgr->stat_idx[i] = fieldstat_easy_register_counter(sess_mgr->fs, sess_mgr_stat_str[i]);
}
- if (fieldstat_easy_enable_auto_output(sess_mgr->fs, "session_manager.fs4", 2) != 0)
+ if (fieldstat_easy_enable_auto_output(sess_mgr->fs, "metrics/session_manager.json", 2) != 0)
{
SESSION_MANAGER_LOG_ERROR("failed to enable auto output");
goto error_out;
}
- sess_mgr->sche = session_manager_sche_new(pkt_mgr, mq_schema, sess_mgr);
- if (sess_mgr->sche == NULL)
+ if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_FORWARD, on_packet_forward, sess_mgr))
+ {
+ SESSION_MANAGER_LOG_ERROR("failed to subscribe PACKET_STAGE_FORWARD");
+ goto error_out;
+ }
+ if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_OUTPUT, on_packet_output, sess_mgr))
+ {
+ SESSION_MANAGER_LOG_ERROR("failed to subscribe PACKET_STAGE_OUTPUT");
+ goto error_out;
+ }
+
+ sess_mgr->ex_sche = exdata_schema_new();
+ if (sess_mgr->ex_sche == NULL)
{
+ SESSION_MANAGER_LOG_ERROR("failed to create exdata_schema");
+ goto error_out;
+ }
+
+ sess_mgr->pkt_ex_to_get_sess = packet_manager_new_packet_exdata_index(pkt_mgr, "pkt_ex_key_for_get_sess", NULL, NULL);
+ if (sess_mgr->pkt_ex_to_get_sess == -1)
+ {
+ SESSION_MANAGER_LOG_ERROR("failed to create packet exdata index");
+ goto error_out;
+ }
+
+ sess_mgr->pkt_ex_to_free_sess = packet_manager_new_packet_exdata_index(pkt_mgr, "pkt_ex_key_for_free_sess", free_session, sess_mgr);
+ if (sess_mgr->pkt_ex_to_free_sess == -1)
+ {
+ SESSION_MANAGER_LOG_ERROR("failed to create packet exdata index");
goto error_out;
}
+ sess_mgr->pkt_mgr = pkt_mgr;
return sess_mgr;
error_out:
@@ -446,44 +334,15 @@ int session_manager_new_session_exdata_index(struct session_manager *sess_mgr, c
assert(sess_mgr);
assert(name);
assert(func);
- return exdata_schema_new_index(sess_mgr->sche->ex_sche, name, func, arg);
+ return exdata_schema_new_index(sess_mgr->ex_sche, name, func, arg);
}
-int session_manager_subscribe_tcp(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args)
-{
- assert(sess_mgr);
- assert(cb);
- return mq_schema_subscribe(sess_mgr->sche->mq_sche, sess_mgr->sche->sess_msg_id_tcp, (on_msg_cb_func *)(void *)cb, args);
-}
-
-int session_manager_subscribe_udp(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args)
-{
- assert(sess_mgr);
- assert(cb);
- return mq_schema_subscribe(sess_mgr->sche->mq_sche, sess_mgr->sche->sess_msg_id_udp, (on_msg_cb_func *)(void *)cb, args);
-}
-
-int session_manager_subscribe_control_packet(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args)
-{
- assert(sess_mgr);
- assert(cb);
- return mq_schema_subscribe(sess_mgr->sche->mq_sche, sess_mgr->sche->sess_msg_id_ctrl, (on_msg_cb_func *)(void *)cb, args);
-}
-
-int session_manager_subscribe_tcp_stream(struct session_manager *sess_mgr, on_tcp_payload_callback *cb, void *args)
-{
- assert(sess_mgr);
- assert(cb);
- return mq_schema_subscribe(sess_mgr->sche->mq_sche, sess_mgr->sche->sess_msg_id_stream, (on_msg_cb_func *)(void *)cb, args);
-}
-
-int session_manager_init(struct session_manager *sess_mgr, uint16_t thread_id, struct mq_runtime *mq_rte)
+int session_manager_init(struct session_manager *sess_mgr, uint16_t thread_id)
{
assert(sess_mgr);
uint64_t now_ms = clock_get_real_time_ms();
sess_mgr->cfg->session_id_seed = sess_mgr->cfg->instance_id << 8 | thread_id;
- sess_mgr->mq[thread_id] = mq_rte;
sess_mgr->rte[thread_id] = session_manager_rte_new(sess_mgr->cfg, now_ms);
if (sess_mgr->rte[thread_id] == NULL)
{
@@ -500,22 +359,26 @@ void session_manager_clean(struct session_manager *sess_mgr, uint16_t thread_id)
{
assert(sess_mgr);
- struct mq_runtime *mq_rte = sess_mgr->mq[thread_id];
- if (sess_mgr->rte[thread_id] == NULL)
+ struct session_manager_rte *rte = session_manager_get_rte(sess_mgr, thread_id);
+
+ if (rte == NULL)
{
return;
}
- struct session_manager_stat *stat = session_manager_rte_get_stat(sess_mgr->rte[thread_id]);
+ struct session *sess = NULL;
+ struct session_manager_stat *stat = session_manager_rte_get_stat(rte);
+
while (stat->tcp_sess_used || stat->udp_sess_used)
{
- clean_closed_session(sess_mgr, thread_id, UINT64_MAX);
- // here we need to dispatch the message to ensure that the session is cleaned up
- mq_runtime_dispatch(mq_rte);
+ while ((sess = session_manager_rte_get_expired_session(rte, UINT64_MAX)))
+ {
+ exdata_runtime_free((struct exdata_runtime *)session_get_user_data(sess));
+ session_manager_rte_free_session(rte, sess);
+ }
}
- session_manager_rte_free(sess_mgr->rte[thread_id]);
- sess_mgr->rte[thread_id] = NULL;
+ session_manager_rte_free(rte);
}
/******************************************************************************
@@ -535,12 +398,10 @@ struct module *session_manager_on_init(struct module_manager *mod_mgr)
struct module *pkt_mgr_mod = module_manager_get_module(mod_mgr, PACKET_MANAGER_MODULE_NAME);
struct packet_manager *pkt_mgr = module_to_packet_manager(pkt_mgr_mod);
assert(pkt_mgr);
- struct mq_schema *mq_sche = module_manager_get_mq_schema(mod_mgr);
- assert(mq_sche);
const char *toml_file = module_manager_get_toml_path(mod_mgr);
assert(toml_file);
- struct session_manager *sess_mgr = session_manager_new(pkt_mgr, mq_sche, toml_file);
+ struct session_manager *sess_mgr = session_manager_new(pkt_mgr, toml_file);
if (sess_mgr == NULL)
{
return NULL;
@@ -575,10 +436,9 @@ struct module *session_manager_on_thread_init(struct module_manager *mod_mgr, in
{
struct session_manager *sess_mgr = module_get_ctx(mod);
assert(sess_mgr);
- struct mq_runtime *mq_rte = module_manager_get_mq_runtime(mod_mgr);
- assert(mq_rte);
- if (session_manager_init(sess_mgr, thread_id, mq_rte) != 0)
+ __thread_pkt_ex_to_get_sess = sess_mgr->pkt_ex_to_get_sess;
+ if (session_manager_init(sess_mgr, thread_id) != 0)
{
SESSION_MANAGER_LOG_ERROR("failed to int session_manager_init");
return NULL;
@@ -592,10 +452,11 @@ struct module *session_manager_on_thread_init(struct module_manager *mod_mgr, in
void session_manager_on_thread_exit(struct module_manager *mod_mgr, int thread_id, struct module *mod)
{
struct session_manager *sess_mgr = module_get_ctx(mod);
- assert(sess_mgr);
- assert(thread_id < (int)sess_mgr->cfg->thread_num);
-
- session_manager_clean(sess_mgr, thread_id);
+ if (sess_mgr)
+ {
+ assert(thread_id < (int)sess_mgr->cfg->thread_num);
+ session_manager_clean(sess_mgr, thread_id);
+ }
}
struct session_manager_rte *session_manager_get_rte(struct session_manager *sess_mgr, uint16_t thread_id)
@@ -609,4 +470,9 @@ struct session_manager_cfg *session_manager_get_cfg(struct session_manager *sess
{
assert(sess_mgr);
return sess_mgr->cfg;
+}
+
+struct session *packet_exdata_to_session(struct packet *pkt)
+{
+ return (struct session *)packet_get_exdata(pkt, __thread_pkt_ex_to_get_sess);
} \ No newline at end of file
diff --git a/infra/session_manager/session_manager_rte.c b/infra/session_manager/session_manager_rte.c
index 2c534a1..c3eb668 100644
--- a/infra/session_manager/session_manager_rte.c
+++ b/infra/session_manager/session_manager_rte.c
@@ -34,7 +34,6 @@ struct session_manager_rte
// only used for session_set_discard() or session_manager_rte_record_duplicated_packet(), because the function is called by plugin and has no time input.
uint64_t now_ms;
- uint64_t last_clean_expired_sess_ts;
struct snowflake *sf;
};
@@ -810,7 +809,6 @@ struct session_manager_rte *session_manager_rte_new(const struct session_manager
TAILQ_INIT(&sess_mgr_rte->evc_list);
session_transition_init();
sess_mgr_rte->now_ms = now_ms;
- sess_mgr_rte->last_clean_expired_sess_ts = now_ms;
return sess_mgr_rte;
@@ -1064,55 +1062,6 @@ struct session *session_manager_rte_get_evicted_session(struct session_manager_r
return sess;
}
-uint64_t session_manager_rte_clean_session(struct session_manager_rte *sess_mgr_rte, uint64_t now_ms, struct session *cleaned_sess_ptr[], uint64_t array_size)
-{
- sess_mgr_rte->now_ms = now_ms;
- struct session *sess = NULL;
- uint64_t cleaned_sess_num = 0;
- uint64_t expired_sess_num = 0;
-
- uint8_t expired_sess_canbe_clean = 0;
- if (now_ms - sess_mgr_rte->last_clean_expired_sess_ts >= sess_mgr_rte->cfg.expire_period_ms ||
- now_ms == UINT64_MAX)
- {
- expired_sess_canbe_clean = 1;
- }
-
- for (uint64_t i = 0; i < array_size; i++)
- {
- // frist clean evicted session
- sess = session_manager_rte_get_evicted_session(sess_mgr_rte);
- if (sess)
- {
- cleaned_sess_ptr[cleaned_sess_num++] = sess;
- }
- // then clean expired session
- else
- {
- if (expired_sess_canbe_clean && expired_sess_num < sess_mgr_rte->cfg.expire_batch_max)
- {
- sess_mgr_rte->last_clean_expired_sess_ts = now_ms;
- sess = session_manager_rte_get_expired_session(sess_mgr_rte, now_ms);
- if (sess)
- {
- cleaned_sess_ptr[cleaned_sess_num++] = sess;
- expired_sess_num++;
- }
- else
- {
- break;
- }
- }
- else
- {
- break;
- }
- }
- }
-
- return cleaned_sess_num;
-}
-
uint64_t session_manager_rte_scan_session(struct session_manager_rte *sess_mgr_rte, const struct session_filter *filter, uint64_t mached_sess_id[], uint64_t array_size)
{
uint64_t capacity = 0;
diff --git a/infra/session_manager/session_manager_rte.h b/infra/session_manager/session_manager_rte.h
index 8adae88..9a3e9e5 100644
--- a/infra/session_manager/session_manager_rte.h
+++ b/infra/session_manager/session_manager_rte.h
@@ -43,7 +43,6 @@ int session_manager_rte_update_session(struct session_manager_rte *sess_mgr_rte,
struct session *session_manager_rte_get_expired_session(struct session_manager_rte *sess_mgr_rte, uint64_t now_ms);
struct session *session_manager_rte_get_evicted_session(struct session_manager_rte *sess_mgr_rte);
-uint64_t session_manager_rte_clean_session(struct session_manager_rte *sess_mgr_rte, uint64_t now_ms, struct session *cleaned_sess_ptr[], uint64_t array_size);
uint64_t session_manager_rte_scan_session(struct session_manager_rte *sess_mgr_rte, const struct session_filter *filter, uint64_t mached_sess_id[], uint64_t array_size);
void session_manager_rte_record_duplicated_packet(struct session_manager_rte *sess_mgr_rte, const struct packet *pkt);
diff --git a/infra/session_manager/session_utils.c b/infra/session_manager/session_utils.c
index fae6e77..be2d392 100644
--- a/infra/session_manager/session_utils.c
+++ b/infra/session_manager/session_utils.c
@@ -5,9 +5,6 @@
void session_init(struct session *sess)
{
memset(sess, 0, sizeof(struct session));
- sess->empty_seg.data = NULL;
- sess->empty_seg.len = 0;
- sess->empty_seg.user_data = sess;
}
void session_set_id(struct session *sess, uint64_t id)
@@ -212,7 +209,6 @@ struct tcp_segment *session_get_tcp_segment(struct session *sess)
{
sess->sess_mgr_stat->tcp_segs_consumed++;
half->inorder_seg_consumed = 1;
- half->inorder_seg.user_data = sess;
return &half->inorder_seg;
}
else
@@ -226,7 +222,6 @@ struct tcp_segment *session_get_tcp_segment(struct session *sess)
// TODO
sess->sess_mgr_stat->tcp_segs_consumed++;
sess->sess_mgr_stat->tcp_segs_reordered++;
- seg->user_data = sess;
}
return seg;
}
@@ -239,12 +234,6 @@ void session_free_tcp_segment(struct session *sess, struct tcp_segment *seg)
return;
}
- // empty segment for end of session
- if (seg == &sess->empty_seg)
- {
- return;
- }
-
enum flow_type type = session_get_flow_type(sess);
struct tcp_half *half = &sess->tcp_halfs[type];
diff --git a/infra/session_manager/test/CMakeLists.txt b/infra/session_manager/test/CMakeLists.txt
index e0e7ede..2b346d9 100644
--- a/infra/session_manager/test/CMakeLists.txt
+++ b/infra/session_manager/test/CMakeLists.txt
@@ -149,4 +149,6 @@ gtest_discover_tests(gtest_session_transition)
gtest_discover_tests(gtest_sess_mgr_tcp_reassembly)
gtest_discover_tests(gtest_sess_mgr_scan)
-gtest_discover_tests(gtest_case_tcp_fast_open) \ No newline at end of file
+gtest_discover_tests(gtest_case_tcp_fast_open)
+
+file(COPY ./metrics/ DESTINATION ./metrics/) \ No newline at end of file
diff --git a/infra/stellar_core.c b/infra/stellar_core.c
index bdb730e..6272241 100644
--- a/infra/stellar_core.c
+++ b/infra/stellar_core.c
@@ -95,6 +95,8 @@ static void *worker_thread(void *arg)
}
}
+ CORE_LOG_FATAL("worker thread %d cleaning", thread_id);
+
module_manager_unregister_thread(mod_mgr, thread_id);
mq_runtime_free(mq_rt);
diff --git a/infra/tcp_reassembly/tcp_reassembly.h b/infra/tcp_reassembly/tcp_reassembly.h
index 87660c4..50e369b 100644
--- a/infra/tcp_reassembly/tcp_reassembly.h
+++ b/infra/tcp_reassembly/tcp_reassembly.h
@@ -12,7 +12,6 @@ struct tcp_segment
{
uint32_t len;
const void *data;
- void *user_data;
};
struct tcp_segment *tcp_segment_new(uint32_t seq, const void *data, uint32_t len);
diff --git a/infra/utils_internal.h b/infra/utils_internal.h
index c026418..b2deb07 100644
--- a/infra/utils_internal.h
+++ b/infra/utils_internal.h
@@ -17,7 +17,7 @@ extern "C"
#define RX_BURST_MAX 32
#define MAX_THREAD_NUM 256 // limit by snowflake
-#define SYNC_STAT_INTERVAL_MS 1000
+#define SYNC_STAT_INTERVAL_MS 1 // TODO
#define ATOMIC_INC(x) __atomic_fetch_add(x, 1, __ATOMIC_RELAXED)
#define ATOMIC_DEC(x) __atomic_fetch_sub(x, 1, __ATOMIC_RELAXED)
diff --git a/infra/version.map b/infra/version.map
index 8560564..47a20b9 100644
--- a/infra/version.map
+++ b/infra/version.map
@@ -25,12 +25,13 @@ global:
packet_manager_build_tcp_packet;
packet_manager_build_udp_packet;
packet_manager_build_l3_packet;
+ packet_manager_dup_packet;
+ packet_manager_free_packet;
session_is_symmetric;
session_has_duplicate_traffic;
session_get_type;
session_get_current_state;
- session_get_current_packet;
session_get_closing_reason;
session_get_direction;
session_get_flow_type;
@@ -47,10 +48,7 @@ global:
session_manager_on_thread_init;
session_manager_on_thread_exit;
session_manager_new_session_exdata_index;
- session_manager_subscribe_tcp;
- session_manager_subscribe_udp;
- session_manager_subscribe_control_packet;
- session_manager_subscribe_tcp_stream;
+ packet_exdata_to_session;
session_monitor_on_init;
session_monitor_on_exit;
diff --git a/scripts/stat_format.sh b/scripts/stat_format.sh
index 999e839..05bcf08 100644
--- a/scripts/stat_format.sh
+++ b/scripts/stat_format.sh
@@ -6,4 +6,5 @@ if [ $# -ne 1 ]; then
fi
f4_json_file=$1
+# python3 -m pip install prettytable jinja2
/opt/MESA/bin/fieldstat_exporter.py local -j $f4_json_file -l --clear-screen
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 11d1abc..5f93a2e 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -1,7 +1,7 @@
#add_subdirectory(packet_inject)
add_subdirectory(packet_tool)
-add_subdirectory(session_debugger)
-add_subdirectory(lpi_plus)
+#add_subdirectory(session_debugger)
+#add_subdirectory(lpi_plus)
#add_subdirectory(decoders/http)
#add_subdirectory(decoders/socks)
#add_subdirectory(decoders/stratum)
diff --git a/test/monitor/CMakeLists.txt b/test/monitor/CMakeLists.txt
index 5f447da..8b5ca5f 100644
--- a/test/monitor/CMakeLists.txt
+++ b/test/monitor/CMakeLists.txt
@@ -26,7 +26,8 @@ foreach(tfile ${MONITOR_TEST_FILE})
endforeach()
set(MONITOR_TEST_RUN_DIR ${CMAKE_CURRENT_BINARY_DIR})
-add_test(NAME MONITOR_ENV_SETUP COMMAND sh -c "mkdir -p ${MONITOR_TEST_RUN_DIR}/conf &&
+add_test(NAME MONITOR_ENV_SETUP COMMAND sh -c "mkdir -p ${MONITOR_TEST_RUN_DIR}/metrics &&
+ mkdir -p ${MONITOR_TEST_RUN_DIR}/conf &&
mkdir -p ${MONITOR_TEST_RUN_DIR}/plugin &&
mkdir -p ${MONITOR_TEST_RUN_DIR}/log &&
mkdir -p ${MONITOR_TEST_RUN_DIR}/pcap &&