summaryrefslogtreecommitdiff
path: root/infra/session_manager
diff options
context:
space:
mode:
authorluwenpeng <[email protected]>2024-10-23 14:19:06 +0800
committerluwenpeng <[email protected]>2024-10-23 14:19:37 +0800
commit3d4e6a2cd51ce933d1d182246b3fb5277c91d665 (patch)
tree1b954a01ca9f4eb84a1fa8555c2b6e56ad58e80f /infra/session_manager
parent99871899578ccc01e95b25347ea29a4e96baee6d (diff)
feature: session manager support subscribe session free message
Diffstat (limited to 'infra/session_manager')
-rw-r--r--infra/session_manager/session_manager.c111
1 files changed, 70 insertions, 41 deletions
diff --git a/infra/session_manager/session_manager.c b/infra/session_manager/session_manager.c
index a6b7929..64ebbc1 100644
--- a/infra/session_manager/session_manager.c
+++ b/infra/session_manager/session_manager.c
@@ -10,6 +10,7 @@
#include "session_internal.h"
#include "session_manager_runtime.h"
+#define MAX_CLEANED_SESS 1024
#define SESSION_MANAGER_MODULE_NAME "session_manager_module"
#pragma GCC diagnostic ignored "-Wunused-parameter"
@@ -22,6 +23,7 @@ struct session_manager_schema
int pkt_exdata_idx;
+ int topic_id_free;
int topic_id_tcp;
int topic_id_udp;
int topic_id_ctrl_pkt;
@@ -40,12 +42,18 @@ struct session_manager
* callback
******************************************************************************/
-static void on_session_dispatch(int topic_id, void *msg, on_msg_cb_func *cb, void *cb_args, void *dispatch_args)
+static void on_session_free_dispatch(int topic_id, void *msg, on_msg_cb_func *cb, void *cb_args, void *dispatch_args)
+{
+ struct session *sess = (struct session *)msg;
+ ((on_session_free_callback *)(void *)cb)(sess, cb_args);
+}
+
+static void on_session_packet_dispatch(int topic_id, void *msg, on_msg_cb_func *cb, void *cb_args, void *dispatch_args)
{
struct session *sess = (struct session *)msg;
struct packet *pkt = (struct packet *)session_get0_current_packet(sess);
- ((on_session_callback *)(void *)cb)(sess, pkt, cb_args);
+ ((on_session_packet_callback *)(void *)cb)(sess, pkt, cb_args);
}
static void on_tcp_stream_dispatch(int topic_id, void *msg, on_msg_cb_func *cb, void *cb_args, void *dispatch_args)
@@ -63,6 +71,23 @@ static void on_tcp_stream_free(void *msg, void *args)
session_free_tcp_segment(sess, seg);
}
+static void on_session_free(void *msg, void *args)
+{
+ struct session *sess = (struct session *)msg;
+ struct session_manager *sess_mgr = (struct session_manager *)args;
+ struct stellar_module_manager *mod_mgr = sess_mgr->mod_mgr;
+ int thread_id = stellar_module_manager_get_thread_id(mod_mgr);
+ struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id];
+
+ char buffer[4096] = {0};
+ session_to_str(sess, 0, buffer, sizeof(buffer));
+ SESSION_MANAGER_LOG_INFO("session free: %s", buffer);
+
+ struct exdata_runtime *exdata_rt = (struct exdata_runtime *)session_get_user_data(sess);
+ exdata_runtime_free(exdata_rt);
+ session_manager_runtime_free_session(sess_mgr_rt, sess);
+}
+
static void on_packet_forward(enum packet_stage stage, struct packet *pkt, void *args)
{
struct session_manager *sess_mgr = (struct session_manager *)args;
@@ -166,39 +191,20 @@ static void on_packet_output(enum packet_stage stage, struct packet *pkt, void *
}
}
-static void clean_session(struct session_manager_runtime *sess_mgr_rt, uint64_t now_ms)
-{
-#define MAX_CLEANED_SESS 1024
- char buffer[4096] = {0};
- struct session *sess = NULL;
- struct session *cleaned_sess[MAX_CLEANED_SESS] = {NULL};
-
- uint64_t used = session_manager_runtime_clean_session(sess_mgr_rt, now_ms, cleaned_sess, MAX_CLEANED_SESS);
- for (uint64_t j = 0; j < used; j++)
- {
- sess = cleaned_sess[j];
-
- session_to_str(sess, 0, buffer, sizeof(buffer));
- SESSION_MANAGER_LOG_INFO("session free: %s", buffer);
-
- // TODO publish session free msg
- // TODO mq_runtime_dispatch_immediate()
-
- struct exdata_runtime *exdata_rt = (struct exdata_runtime *)session_get_user_data(sess);
- exdata_runtime_free(exdata_rt);
-
- session_manager_runtime_free_session(sess_mgr_rt, sess);
- }
-}
-
static void on_polling(struct stellar_module_manager *mod_mgr, void *args)
{
+ uint64_t now_ms = clock_get_real_time_ms();
+ struct session *cleaned[MAX_CLEANED_SESS] = {NULL};
struct session_manager *sess_mgr = (struct session_manager *)args;
int thread_id = stellar_module_manager_get_thread_id(mod_mgr);
+ struct mq_runtime *mq_rt = stellar_module_manager_get_mq_runtime(mod_mgr);
struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id];
- uint64_t now_ms = clock_get_real_time_ms();
- clean_session(sess_mgr_rt, now_ms);
+ uint64_t used = session_manager_runtime_clean_session(sess_mgr_rt, now_ms, cleaned, MAX_CLEANED_SESS);
+ for (uint64_t i = 0; i < used; i++)
+ {
+ mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_free, cleaned[i]);
+ }
// TODO
// ouput stat to fs4
@@ -215,6 +221,7 @@ void session_manager_schema_free(struct session_manager_schema *sess_mgr_schema)
{
if (sess_mgr_schema->mq)
{
+ mq_schema_destroy_topic(sess_mgr_schema->mq, sess_mgr_schema->topic_id_free);
mq_schema_destroy_topic(sess_mgr_schema->mq, sess_mgr_schema->topic_id_tcp);
mq_schema_destroy_topic(sess_mgr_schema->mq, sess_mgr_schema->topic_id_udp);
mq_schema_destroy_topic(sess_mgr_schema->mq, sess_mgr_schema->topic_id_ctrl_pkt);
@@ -262,28 +269,34 @@ struct session_manager_schema *session_manager_schema_new(struct packet_manager
goto error_out;
}
- sess_mgr_schema->topic_id_tcp = mq_schema_create_topic(sess_mgr_schema->mq, "TCP", &on_session_dispatch, NULL, NULL, NULL);
+ sess_mgr_schema->topic_id_free = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_FREE", &on_session_free_dispatch, NULL, &on_session_free, subscribe_args);
+ if (sess_mgr_schema->topic_id_free == -1)
+ {
+ SESSION_MANAGER_LOG_ERROR("failed to create topic SESSIOM_MANAGER_TOPIC_FREE");
+ goto error_out;
+ }
+ sess_mgr_schema->topic_id_tcp = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_TCP", &on_session_packet_dispatch, NULL, NULL, NULL);
if (sess_mgr_schema->topic_id_tcp == -1)
{
- SESSION_MANAGER_LOG_ERROR("failed to create topic TCP");
+ SESSION_MANAGER_LOG_ERROR("failed to create topic SESSIOM_MANAGER_TOPIC_FREE");
goto error_out;
}
- sess_mgr_schema->topic_id_udp = mq_schema_create_topic(sess_mgr_schema->mq, "UDP", &on_session_dispatch, NULL, NULL, NULL);
+ sess_mgr_schema->topic_id_udp = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_UDP", &on_session_packet_dispatch, NULL, NULL, NULL);
if (sess_mgr_schema->topic_id_udp == -1)
{
- SESSION_MANAGER_LOG_ERROR("failed to create topic UDP");
+ SESSION_MANAGER_LOG_ERROR("failed to create topic SESSIOM_MANAGER_TOPIC_UDP");
goto error_out;
}
- sess_mgr_schema->topic_id_ctrl_pkt = mq_schema_create_topic(sess_mgr_schema->mq, "CTRL_PKT", &on_session_dispatch, NULL, NULL, NULL);
+ sess_mgr_schema->topic_id_ctrl_pkt = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_CTRL_PKT", &on_session_packet_dispatch, NULL, NULL, NULL);
if (sess_mgr_schema->topic_id_ctrl_pkt == -1)
{
- SESSION_MANAGER_LOG_ERROR("failed to create topic CTRL_PKT");
+ SESSION_MANAGER_LOG_ERROR("failed to create topic SESSIOM_MANAGER_TOPIC_CTRL_PKT");
goto error_out;
}
- sess_mgr_schema->topic_id_tcp_stream = mq_schema_create_topic(sess_mgr_schema->mq, "TCP_STREAM", &on_tcp_stream_dispatch, NULL, &on_tcp_stream_free, NULL);
+ sess_mgr_schema->topic_id_tcp_stream = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_TCP_STREAM", &on_tcp_stream_dispatch, NULL, &on_tcp_stream_free, NULL);
if (sess_mgr_schema->topic_id_tcp_stream == -1)
{
- SESSION_MANAGER_LOG_ERROR("failed to create topic TCP_STREAM");
+ SESSION_MANAGER_LOG_ERROR("failed to create topic SESSIOM_MANAGER_TOPIC_TCP_STREAM");
goto error_out;
}
@@ -348,7 +361,15 @@ int session_manager_new_session_exdata_index(struct session_manager *sess_mgr, c
return exdata_schema_new_index(sess_mgr->schema->exdata, name, func, arg);
}
-int session_manager_subscribe_tcp(struct session_manager *sess_mgr, on_session_callback *cb, void *args)
+int session_manager_subscribe_free(struct session_manager *sess_mgr, on_session_free_callback *cb, void *args)
+{
+ assert(sess_mgr);
+ assert(cb);
+
+ return mq_schema_subscribe(sess_mgr->schema->mq, sess_mgr->schema->topic_id_free, (on_msg_cb_func *)(void *)cb, args);
+}
+
+int session_manager_subscribe_tcp(struct session_manager *sess_mgr, on_session_packet_callback *cb, void *args)
{
assert(sess_mgr);
assert(cb);
@@ -356,7 +377,7 @@ int session_manager_subscribe_tcp(struct session_manager *sess_mgr, on_session_c
return mq_schema_subscribe(sess_mgr->schema->mq, sess_mgr->schema->topic_id_tcp, (on_msg_cb_func *)(void *)cb, args);
}
-int session_manager_subscribe_udp(struct session_manager *sess_mgr, on_session_callback *cb, void *args)
+int session_manager_subscribe_udp(struct session_manager *sess_mgr, on_session_packet_callback *cb, void *args)
{
assert(sess_mgr);
assert(cb);
@@ -364,7 +385,7 @@ int session_manager_subscribe_udp(struct session_manager *sess_mgr, on_session_c
return mq_schema_subscribe(sess_mgr->schema->mq, sess_mgr->schema->topic_id_udp, (on_msg_cb_func *)(void *)cb, args);
}
-int session_manager_subscribe_control_packet(struct session_manager *sess_mgr, on_session_callback *cb, void *args)
+int session_manager_subscribe_control_packet(struct session_manager *sess_mgr, on_session_packet_callback *cb, void *args)
{
assert(sess_mgr);
assert(cb);
@@ -405,6 +426,8 @@ void session_manager_clean(struct session_manager *sess_mgr, uint16_t thread_id)
assert(sess_mgr);
assert(thread_id < sess_mgr->cfg->thread_num);
+ struct stellar_module_manager *mod_mgr = sess_mgr->mod_mgr;
+ struct mq_runtime *mq_rt = stellar_module_manager_get_mq_runtime(mod_mgr);
struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id];
if (sess_mgr_rt == NULL)
{
@@ -414,7 +437,13 @@ void session_manager_clean(struct session_manager *sess_mgr, uint16_t thread_id)
struct session_manager_stat *stat = session_manager_runtime_get_stat(sess_mgr_rt);
while (stat->tcp_sess_used || stat->udp_sess_used)
{
- clean_session(sess_mgr_rt, UINT64_MAX);
+ struct session *cleaned[MAX_CLEANED_SESS] = {NULL};
+ uint64_t used = session_manager_runtime_clean_session(sess_mgr_rt, UINT64_MAX, cleaned, MAX_CLEANED_SESS);
+ for (uint64_t i = 0; i < used; i++)
+ {
+ mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_free, cleaned[i]);
+ mq_runtime_dispatch(mq_rt);
+ }
}
SESSION_MANAGER_LOG_INFO("runtime: %p, idx: %d, will be cleaned", sess_mgr_rt, thread_id);