summaryrefslogtreecommitdiff
path: root/infra
diff options
context:
space:
mode:
authorluwenpeng <[email protected]>2024-10-25 19:15:28 +0800
committerluwenpeng <[email protected]>2024-10-25 19:15:45 +0800
commit03864c9731ec87ed6c7e1fbe3c5f261880e857dc (patch)
tree2b2343f491faf890889411b1a69a3f40b956e58f /infra
parent4061d5a942392c4716586ded8c586440d2e920ca (diff)
Add state to the on_session_message parameter.
* When state is closed, it implies that packet is null and the session will be destroyed
Diffstat (limited to 'infra')
-rw-r--r--infra/session_manager/session_internal.h6
-rw-r--r--infra/session_manager/session_manager.c146
-rw-r--r--infra/session_manager/session_manager_runtime.c12
-rw-r--r--infra/session_manager/session_utils.c25
-rw-r--r--infra/version.map28
5 files changed, 116 insertions, 101 deletions
diff --git a/infra/session_manager/session_internal.h b/infra/session_manager/session_internal.h
index b117ba0..51413dc 100644
--- a/infra/session_manager/session_internal.h
+++ b/infra/session_manager/session_internal.h
@@ -21,9 +21,8 @@ extern "C"
struct tcp_half
{
struct tcp_reassembly *tcp_reass;
- struct tcp_segment in_order; // current packet in order segment
- uint32_t in_order_ref; // reference count of current packet in order segment
-
+ struct tcp_segment inorder_seg; // current packet in order segment
+ uint32_t inorder_seg_consumed;
uint32_t seq; // current packet sequence number
uint32_t ack; // current packet ack number
uint16_t len; // current packet payload length
@@ -48,6 +47,7 @@ struct session
uint64_t timestamps[MAX_TIMESTAMP]; // realtime msec
struct tcp_half tcp_halfs[MAX_FLOW_TYPE];
struct timeout timeout;
+ struct tcp_segment empty_seg;
TAILQ_ENTRY(session) lru_tqe;
TAILQ_ENTRY(session) free_tqe;
TAILQ_ENTRY(session) evicte_tqe;
diff --git a/infra/session_manager/session_manager.c b/infra/session_manager/session_manager.c
index 3aa9af4..7fd0805 100644
--- a/infra/session_manager/session_manager.c
+++ b/infra/session_manager/session_manager.c
@@ -21,7 +21,6 @@ struct session_manager_schema
int pkt_exdata_idx;
- int topic_id_free;
int topic_id_tcp;
int topic_id_udp;
int topic_id_ctrl_pkt;
@@ -40,28 +39,25 @@ struct session_manager
* callback
******************************************************************************/
-static void on_session_free_dispatch(int topic_id, void *msg, on_msg_cb_func *cb, void *cb_args, void *dispatch_args)
-{
- struct session *sess = (struct session *)msg;
- ((on_session_free_callback *)(void *)cb)(sess, cb_args);
-}
-
-static void on_session_packet_dispatch(int topic_id, void *msg, on_msg_cb_func *cb, void *cb_args, void *dispatch_args)
+static void on_session_message_dispatch(int topic_id, void *msg, on_msg_cb_func *cb, void *cb_args, void *dispatch_args)
{
struct session *sess = (struct session *)msg;
struct packet *pkt = (struct packet *)session_get0_current_packet(sess);
+ enum session_state state = session_get_current_state(sess);
- ((on_session_packet_callback *)(void *)cb)(sess, pkt, cb_args);
+ ((on_session_message_callback *)(void *)cb)(sess, state, pkt, cb_args);
}
-static void on_tcp_stream_dispatch(int topic_id, void *msg, on_msg_cb_func *cb, void *cb_args, void *dispatch_args)
+static void on_tcp_payload_message_dispatch(int topic_id, void *msg, on_msg_cb_func *cb, void *cb_args, void *dispatch_args)
{
struct tcp_segment *seg = (struct tcp_segment *)msg;
+ struct session *sess = (struct session *)seg->user_data;
+ enum session_state state = session_get_current_state(sess);
- ((on_tcp_stream_callback *)(void *)cb)(seg->user_data, seg->data, seg->len, cb_args);
+ ((on_tcp_payload_callback *)(void *)cb)(sess, state, seg->data, seg->len, cb_args);
}
-static void on_tcp_stream_free(void *msg, void *args)
+static void on_tcp_payload_message_free(void *msg, void *args)
{
struct tcp_segment *seg = (struct tcp_segment *)msg;
struct session *sess = (struct session *)seg->user_data;
@@ -69,29 +65,31 @@ static void on_tcp_stream_free(void *msg, void *args)
session_free_tcp_segment(sess, seg);
}
-static void on_session_free(void *msg, void *args)
+static void on_session_message_free(void *msg, void *args)
{
struct session *sess = (struct session *)msg;
- struct session_manager *sess_mgr = (struct session_manager *)args;
- struct stellar_module_manager *mod_mgr = sess_mgr->mod_mgr;
- int thread_id = stellar_module_manager_get_thread_id(mod_mgr);
- struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id];
- char buffer[4096] = {0};
- session_to_str(sess, 0, buffer, sizeof(buffer));
- SESSION_MANAGER_LOG_INFO("session free: %s", buffer);
+ if (session_get_current_state(sess) == SESSION_STATE_CLOSED)
+ {
+ struct session_manager *sess_mgr = (struct session_manager *)args;
+ int thread_id = stellar_module_manager_get_thread_id(sess_mgr->mod_mgr);
+ struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id];
+
+ char buffer[4096] = {0};
+ session_to_str(sess, 0, buffer, sizeof(buffer));
+ SESSION_MANAGER_LOG_INFO("session free: %s", buffer);
- struct exdata_runtime *exdata_rt = (struct exdata_runtime *)session_get_user_data(sess);
- exdata_runtime_free(exdata_rt);
- session_manager_runtime_free_session(sess_mgr_rt, sess);
+ struct exdata_runtime *exdata_rt = (struct exdata_runtime *)session_get_user_data(sess);
+ exdata_runtime_free(exdata_rt);
+ session_manager_runtime_free_session(sess_mgr_rt, sess);
+ }
}
static void on_packet_forward(struct packet *pkt, enum packet_stage stage, void *args)
{
struct session_manager *sess_mgr = (struct session_manager *)args;
- struct stellar_module_manager *mod_mgr = sess_mgr->mod_mgr;
- int thread_id = stellar_module_manager_get_thread_id(mod_mgr);
- struct mq_runtime *mq_rt = stellar_module_manager_get_mq_runtime(mod_mgr);
+ int thread_id = stellar_module_manager_get_thread_id(sess_mgr->mod_mgr);
+ struct mq_runtime *mq_rt = stellar_module_manager_get_mq_runtime(sess_mgr->mod_mgr);
struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id];
/*
@@ -181,8 +179,7 @@ fast_path:
static void on_packet_output(struct packet *pkt, enum packet_stage stage, void *args)
{
struct session_manager *sess_mgr = (struct session_manager *)args;
- struct stellar_module_manager *mod_mgr = sess_mgr->mod_mgr;
- int thread_id = stellar_module_manager_get_thread_id(mod_mgr);
+ int thread_id = stellar_module_manager_get_thread_id(sess_mgr->mod_mgr);
struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id];
struct session *sess = (struct session *)packet_get_exdata(pkt, sess_mgr->schema->pkt_exdata_idx);
@@ -227,24 +224,44 @@ static void on_packet_output(struct packet *pkt, enum packet_stage stage, void *
}
}
-static void on_polling(struct stellar_module_manager *mod_mgr, void *args)
+static void publish_session_closed_message_batch(struct session_manager *sess_mgr, uint16_t thread_id, uint64_t now_ms)
{
- uint64_t now_ms = clock_get_real_time_ms();
+ struct session *sess = NULL;
struct session *cleaned[MAX_CLEANED_SESS] = {NULL};
- struct session_manager *sess_mgr = (struct session_manager *)args;
- int thread_id = stellar_module_manager_get_thread_id(mod_mgr);
- struct mq_runtime *mq_rt = stellar_module_manager_get_mq_runtime(mod_mgr);
struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id];
+ struct mq_runtime *mq_rt = stellar_module_manager_get_mq_runtime(sess_mgr->mod_mgr);
uint64_t used = session_manager_runtime_clean_session(sess_mgr_rt, now_ms, cleaned, MAX_CLEANED_SESS);
for (uint64_t i = 0; i < used; i++)
{
- mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_free, cleaned[i]);
+ sess = cleaned[i];
+ assert(session_get_current_state(sess) == SESSION_STATE_CLOSED);
+ session_set_current_packet(sess, NULL);
+ session_set_flow_type(sess, FLOW_TYPE_NONE);
+
+ if (session_get_type(sess) == SESSION_TYPE_TCP)
+ {
+ mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_tcp_stream, &sess->empty_seg);
+ mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_ctrl_pkt, sess);
+ mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_tcp, sess);
+ }
+ else
+ {
+ mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_ctrl_pkt, sess);
+ mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_udp, sess);
+ }
}
+}
+
+static void on_polling(struct stellar_module_manager *mod_mgr, void *args)
+{
+ uint64_t now_ms = clock_get_real_time_ms();
+ struct session_manager *sess_mgr = (struct session_manager *)args;
+ int thread_id = stellar_module_manager_get_thread_id(mod_mgr);
+ publish_session_closed_message_batch(sess_mgr, thread_id, now_ms);
// TODO
// ouput stat to fs4
- // session_manager_runtime_print_stat(sess_mgr_rt);
}
/******************************************************************************
@@ -257,7 +274,6 @@ void session_manager_schema_free(struct session_manager_schema *sess_mgr_schema)
{
if (sess_mgr_schema->mq)
{
- mq_schema_destroy_topic(sess_mgr_schema->mq, sess_mgr_schema->topic_id_free);
mq_schema_destroy_topic(sess_mgr_schema->mq, sess_mgr_schema->topic_id_tcp);
mq_schema_destroy_topic(sess_mgr_schema->mq, sess_mgr_schema->topic_id_udp);
mq_schema_destroy_topic(sess_mgr_schema->mq, sess_mgr_schema->topic_id_ctrl_pkt);
@@ -270,14 +286,14 @@ void session_manager_schema_free(struct session_manager_schema *sess_mgr_schema)
}
}
-struct session_manager_schema *session_manager_schema_new(struct packet_manager *pkt_mgr, struct mq_schema *mq, void *subscribe_args)
+struct session_manager_schema *session_manager_schema_new(struct packet_manager *pkt_mgr, struct mq_schema *mq, void *sess_mgr)
{
- if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_FORWARD, on_packet_forward, subscribe_args))
+ if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_FORWARD, on_packet_forward, sess_mgr))
{
SESSION_MANAGER_LOG_ERROR("failed to subscribe PACKET_STAGE_FORWARD");
return NULL;
}
- if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_OUTPUT, on_packet_output, subscribe_args))
+ if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_OUTPUT, on_packet_output, sess_mgr))
{
SESSION_MANAGER_LOG_ERROR("failed to subscribe PACKET_STAGE_OUTPUT");
return NULL;
@@ -305,31 +321,34 @@ struct session_manager_schema *session_manager_schema_new(struct packet_manager
goto error_out;
}
- sess_mgr_schema->topic_id_free = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_FREE", &on_session_free_dispatch, NULL, &on_session_free, subscribe_args);
- if (sess_mgr_schema->topic_id_free == -1)
- {
- SESSION_MANAGER_LOG_ERROR("failed to create topic SESSIOM_MANAGER_TOPIC_FREE");
- goto error_out;
- }
- sess_mgr_schema->topic_id_tcp = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_TCP", &on_session_packet_dispatch, NULL, NULL, NULL);
+ /*
+ * Publish session closed messages to multiple topics.
+ * Each topic has its own session message free callback.
+ * To prevent the same session from being freeed multiple times,
+ * only TCP/UDP topics register session message free callbacks,
+ * and other topics do not register session message callbacks;
+ *
+ * Restriction: MQ ensures that the session message free order is consistent with the publishing order
+ */
+ sess_mgr_schema->topic_id_tcp = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_TCP", &on_session_message_dispatch, NULL, &on_session_message_free, sess_mgr);
if (sess_mgr_schema->topic_id_tcp == -1)
{
SESSION_MANAGER_LOG_ERROR("failed to create topic SESSIOM_MANAGER_TOPIC_FREE");
goto error_out;
}
- sess_mgr_schema->topic_id_udp = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_UDP", &on_session_packet_dispatch, NULL, NULL, NULL);
+ sess_mgr_schema->topic_id_udp = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_UDP", &on_session_message_dispatch, NULL, &on_session_message_free, sess_mgr);
if (sess_mgr_schema->topic_id_udp == -1)
{
SESSION_MANAGER_LOG_ERROR("failed to create topic SESSIOM_MANAGER_TOPIC_UDP");
goto error_out;
}
- sess_mgr_schema->topic_id_ctrl_pkt = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_CTRL_PKT", &on_session_packet_dispatch, NULL, NULL, NULL);
+ sess_mgr_schema->topic_id_ctrl_pkt = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_CTRL_PKT", &on_session_message_dispatch, NULL, NULL, NULL);
if (sess_mgr_schema->topic_id_ctrl_pkt == -1)
{
SESSION_MANAGER_LOG_ERROR("failed to create topic SESSIOM_MANAGER_TOPIC_CTRL_PKT");
goto error_out;
}
- sess_mgr_schema->topic_id_tcp_stream = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_TCP_STREAM", &on_tcp_stream_dispatch, NULL, &on_tcp_stream_free, NULL);
+ sess_mgr_schema->topic_id_tcp_stream = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_TCP_STREAM", &on_tcp_payload_message_dispatch, NULL, &on_tcp_payload_message_free, sess_mgr);
if (sess_mgr_schema->topic_id_tcp_stream == -1)
{
SESSION_MANAGER_LOG_ERROR("failed to create topic SESSIOM_MANAGER_TOPIC_TCP_STREAM");
@@ -397,15 +416,7 @@ int session_manager_new_session_exdata_index(struct session_manager *sess_mgr, c
return exdata_schema_new_index(sess_mgr->schema->exdata, name, func, arg);
}
-int session_manager_subscribe_free(struct session_manager *sess_mgr, on_session_free_callback *cb, void *args)
-{
- assert(sess_mgr);
- assert(cb);
-
- return mq_schema_subscribe(sess_mgr->schema->mq, sess_mgr->schema->topic_id_free, (on_msg_cb_func *)(void *)cb, args);
-}
-
-int session_manager_subscribe_tcp(struct session_manager *sess_mgr, on_session_packet_callback *cb, void *args)
+int session_manager_subscribe_tcp(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args)
{
assert(sess_mgr);
assert(cb);
@@ -413,7 +424,7 @@ int session_manager_subscribe_tcp(struct session_manager *sess_mgr, on_session_p
return mq_schema_subscribe(sess_mgr->schema->mq, sess_mgr->schema->topic_id_tcp, (on_msg_cb_func *)(void *)cb, args);
}
-int session_manager_subscribe_udp(struct session_manager *sess_mgr, on_session_packet_callback *cb, void *args)
+int session_manager_subscribe_udp(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args)
{
assert(sess_mgr);
assert(cb);
@@ -421,7 +432,7 @@ int session_manager_subscribe_udp(struct session_manager *sess_mgr, on_session_p
return mq_schema_subscribe(sess_mgr->schema->mq, sess_mgr->schema->topic_id_udp, (on_msg_cb_func *)(void *)cb, args);
}
-int session_manager_subscribe_control_packet(struct session_manager *sess_mgr, on_session_packet_callback *cb, void *args)
+int session_manager_subscribe_control_packet(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args)
{
assert(sess_mgr);
assert(cb);
@@ -429,7 +440,7 @@ int session_manager_subscribe_control_packet(struct session_manager *sess_mgr, o
return mq_schema_subscribe(sess_mgr->schema->mq, sess_mgr->schema->topic_id_ctrl_pkt, (on_msg_cb_func *)(void *)cb, args);
}
-int session_manager_subscribe_tcp_stream(struct session_manager *sess_mgr, on_tcp_stream_callback *cb, void *args)
+int session_manager_subscribe_tcp_stream(struct session_manager *sess_mgr, on_tcp_payload_callback *cb, void *args)
{
assert(sess_mgr);
assert(cb);
@@ -462,8 +473,7 @@ void session_manager_clean(struct session_manager *sess_mgr, uint16_t thread_id)
assert(sess_mgr);
assert(thread_id < sess_mgr->cfg->thread_num);
- struct stellar_module_manager *mod_mgr = sess_mgr->mod_mgr;
- struct mq_runtime *mq_rt = stellar_module_manager_get_mq_runtime(mod_mgr);
+ struct mq_runtime *mq_rt = stellar_module_manager_get_mq_runtime(sess_mgr->mod_mgr);
struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id];
if (sess_mgr_rt == NULL)
{
@@ -473,13 +483,9 @@ void session_manager_clean(struct session_manager *sess_mgr, uint16_t thread_id)
struct session_manager_stat *stat = session_manager_runtime_get_stat(sess_mgr_rt);
while (stat->tcp_sess_used || stat->udp_sess_used)
{
- struct session *cleaned[MAX_CLEANED_SESS] = {NULL};
- uint64_t used = session_manager_runtime_clean_session(sess_mgr_rt, UINT64_MAX, cleaned, MAX_CLEANED_SESS);
- for (uint64_t i = 0; i < used; i++)
- {
- mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_free, cleaned[i]);
- mq_runtime_dispatch(mq_rt);
- }
+ publish_session_closed_message_batch(sess_mgr, thread_id, UINT64_MAX);
+ // here we need to dispatch the message to ensure that the session is cleaned up
+ mq_runtime_dispatch(mq_rt);
}
SESSION_MANAGER_LOG_INFO("runtime: %p, idx: %d, will be cleaned", sess_mgr_rt, thread_id);
diff --git a/infra/session_manager/session_manager_runtime.c b/infra/session_manager/session_manager_runtime.c
index 9c4f123..97fbb28 100644
--- a/infra/session_manager/session_manager_runtime.c
+++ b/infra/session_manager/session_manager_runtime.c
@@ -243,9 +243,9 @@ static void tcp_update(struct session_manager_runtime *sess_mgr_rt, struct sessi
session_inc_stat(sess, type, STAT_TCP_PAYLOADS_INORDER, len);
sess_mgr_rt->stat.tcp_segs_inorder++;
- half->in_order.data = tcp_layer->pld_ptr;
- half->in_order.len = len;
- half->in_order_ref = 0;
+ half->inorder_seg.data = tcp_layer->pld_ptr;
+ half->inorder_seg.len = len;
+ half->inorder_seg_consumed = 0;
}
return;
}
@@ -284,9 +284,9 @@ static void tcp_update(struct session_manager_runtime *sess_mgr_rt, struct sessi
session_inc_stat(sess, type, STAT_TCP_PAYLOADS_INORDER, len);
sess_mgr_rt->stat.tcp_segs_inorder++;
- half->in_order.data = tcp_layer->pld_ptr;
- half->in_order.len = len;
- half->in_order_ref = 0;
+ half->inorder_seg.data = tcp_layer->pld_ptr;
+ half->inorder_seg.len = len;
+ half->inorder_seg_consumed = 0;
tcp_reassembly_inc_recv_next(half->tcp_reass, len);
}
// retransmission
diff --git a/infra/session_manager/session_utils.c b/infra/session_manager/session_utils.c
index 630cf59..801ddd4 100644
--- a/infra/session_manager/session_utils.c
+++ b/infra/session_manager/session_utils.c
@@ -5,6 +5,9 @@
void session_init(struct session *sess)
{
memset(sess, 0, sizeof(struct session));
+ sess->empty_seg.data = NULL;
+ sess->empty_seg.len = 0;
+ sess->empty_seg.user_data = sess;
}
void session_set_id(struct session *sess, uint64_t id)
@@ -205,12 +208,12 @@ struct tcp_segment *session_get_tcp_segment(struct session *sess)
enum flow_type type = session_get_flow_type(sess);
struct tcp_half *half = &sess->tcp_halfs[type];
- if (half->in_order.data != NULL && half->in_order.len > 0 && half->in_order_ref == 0)
+ if (half->inorder_seg.data != NULL && half->inorder_seg.len > 0 && !half->inorder_seg_consumed)
{
sess->sess_mgr_stat->tcp_segs_consumed++;
- half->in_order_ref++;
- half->in_order.user_data = sess;
- return &half->in_order;
+ half->inorder_seg_consumed = 1;
+ half->inorder_seg.user_data = sess;
+ return &half->inorder_seg;
}
else
{
@@ -236,15 +239,23 @@ void session_free_tcp_segment(struct session *sess, struct tcp_segment *seg)
return;
}
+ // empty segment for end of session
+ if (seg == &sess->empty_seg)
+ {
+ return;
+ }
+
enum flow_type type = session_get_flow_type(sess);
struct tcp_half *half = &sess->tcp_halfs[type];
- if (seg == &half->in_order)
+ // in order segment
+ if (seg == &half->inorder_seg)
{
- half->in_order.data = NULL;
- half->in_order.len = 0;
+ half->inorder_seg.data = NULL;
+ half->inorder_seg.len = 0;
return;
}
+ // tcp reassembly segment
else
{
session_inc_stat(sess, type, STAT_TCP_SEGMENTS_RELEASED, 1);
diff --git a/infra/version.map b/infra/version.map
index 9f1ee3a..9941f31 100644
--- a/infra/version.map
+++ b/infra/version.map
@@ -24,10 +24,6 @@ global:
packet_manager_claim_packet;
packet_manager_schedule_packet;
- exdata_*;
- mq_*;
- stellar_module_*;
-
session_is_symmetric;
session_has_duplicate_traffic;
session_get_type;
@@ -44,6 +40,19 @@ global:
session_set_discard;
session_get_exdata;
session_set_exdata;
+ session_manager_on_init;
+ session_manager_on_exit;
+ session_manager_on_thread_init;
+ session_manager_on_thread_exit;
+ session_manager_new_session_exdata_index;
+ session_manager_subscribe_tcp;
+ session_manager_subscribe_udp;
+ session_manager_subscribe_control_packet;
+ session_manager_subscribe_tcp_stream;
+
+ exdata_*;
+ mq_*;
+ stellar_module_*;
stellar_new;
stellar_run;
@@ -58,17 +67,6 @@ global:
log_print;
log_check_level;
- session_manager_on_init;
- session_manager_on_exit;
- session_manager_on_thread_init;
- session_manager_on_thread_exit;
- session_manager_new_session_exdata_index;
- session_manager_subscribe_free;
- session_manager_subscribe_tcp;
- session_manager_subscribe_udp;
- session_manager_subscribe_control_packet;
- session_manager_subscribe_tcp_stream;
-
http_message_*;
http_decoder_init;
http_decoder_exit;