summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorluwenpeng <[email protected]>2024-10-09 16:26:42 +0800
committerluwenpeng <[email protected]>2024-10-10 09:55:00 +0800
commit2e35a795285499e064b91435ad90777f90419f68 (patch)
tree38755f7964e004b32233b7edde609ef18e7d6d85
parentc5d7208c11664609f4e1655d65feb44e025b2cdb (diff)
Export the packet manager runtime API for easier testing
-rw-r--r--infra/packet_manager/CMakeLists.txt1
-rw-r--r--infra/packet_manager/packet_manager.c189
-rw-r--r--infra/packet_manager/packet_manager_internal.h21
-rw-r--r--infra/packet_manager/packet_manager_runtime.c173
-rw-r--r--infra/packet_manager/packet_manager_runtime.h69
-rw-r--r--infra/packet_manager/test/gtest_packet_manager.cpp2
-rw-r--r--infra/session_manager/session_manager.c1
-rw-r--r--infra/session_manager/session_manager_runtime.c14
-rw-r--r--infra/stellar_core.c3
-rw-r--r--test/session_debugger/session_debugger.c4
10 files changed, 272 insertions, 205 deletions
diff --git a/infra/packet_manager/CMakeLists.txt b/infra/packet_manager/CMakeLists.txt
index 9b9c132..41af01b 100644
--- a/infra/packet_manager/CMakeLists.txt
+++ b/infra/packet_manager/CMakeLists.txt
@@ -1,4 +1,5 @@
add_library(packet_manager
+ packet_manager_runtime.c
packet_manager.c
packet_parser.c
packet_builder.c
diff --git a/infra/packet_manager/packet_manager.c b/infra/packet_manager/packet_manager.c
index 7e903a6..eba87ec 100644
--- a/infra/packet_manager/packet_manager.c
+++ b/infra/packet_manager/packet_manager.c
@@ -1,17 +1,10 @@
#include <assert.h>
-#include "stellar/mq.h"
-#include "stellar/module_manager.h"
-
#include "utils.h"
#include "packet_internal.h"
+#include "packet_manager_runtime.h"
#include "packet_manager_internal.h"
-
-#define PACKET_MANAGER_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "packet manager", format, ##__VA_ARGS__)
-#define PACKET_MANAGER_LOG_DEBUG(format, ...) STELLAR_LOG_DEBUG(__thread_local_logger, "packet manager", format, ##__VA_ARGS__)
-#define PACKET_MANAGER_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "packet manager", format, ##__VA_ARGS__)
-
-TAILQ_HEAD(packet_queue, packet);
+#include "stellar/module_manager.h"
struct packet_manager_config
{
@@ -25,18 +18,6 @@ struct packet_manager_schema
int topic_id[PACKET_STAGE_MAX];
};
-struct packet_manager_runtime
-{
- enum packet_stage stage;
- struct packet_queue queue[PACKET_QUEUE_MAX];
-
- void *cb_args;
- on_packet_claimed_callback *claimed_cb;
-
- struct mq_runtime *mq;
- struct packet_manager_stat stat;
-};
-
struct packet_manager
{
struct packet_manager_config *cfg;
@@ -45,29 +26,6 @@ struct packet_manager
};
/******************************************************************************
- * packet stage
- ******************************************************************************/
-
-const char *packet_stage_to_str(enum packet_stage stage)
-{
- switch (stage)
- {
- case PACKET_STAGE_PREROUTING:
- return "PACKET_STAGE_PREROUTING";
- case PACKET_STAGE_INPUT:
- return "PACKET_STAGE_INPUT";
- case PACKET_STAGE_FORWARD:
- return "PACKET_STAGE_FORWARD";
- case PACKET_STAGE_OUTPUT:
- return "PACKET_STAGE_OUTPUT";
- case PACKET_STAGE_POSTROUTING:
- return "PACKET_STAGE_POSTROUTING";
- default:
- return "PACKET_STAGE_UNKNOWN";
- }
-}
-
-/******************************************************************************
* packet manager config
******************************************************************************/
@@ -186,47 +144,6 @@ error_out:
}
/******************************************************************************
- * packet manager runtime
- ******************************************************************************/
-
-static void packet_manager_runtime_free(struct packet_manager_runtime *runtime)
-{
- if (runtime)
- {
- for (int i = 0; i < PACKET_QUEUE_MAX; i++)
- {
- struct packet *pkt = NULL;
- while ((pkt = TAILQ_FIRST(&runtime->queue[i])))
- {
- TAILQ_REMOVE(&runtime->queue[i], pkt, stage_tqe);
-
- // TODO: free packet and free mbuff
- packet_free(pkt);
- }
- }
- }
- free(runtime);
- runtime = NULL;
-}
-
-static struct packet_manager_runtime *packet_manager_runtime_new()
-{
- struct packet_manager_runtime *runtime = calloc(1, sizeof(struct packet_manager_runtime));
- if (runtime == NULL)
- {
- PACKET_MANAGER_LOG_ERROR("failed to allocate memory for packet_manager_runtime");
- return NULL;
- }
-
- for (int i = 0; i < PACKET_QUEUE_MAX; i++)
- {
- TAILQ_INIT(&runtime->queue[i]);
- }
-
- return runtime;
-}
-
-/******************************************************************************
* packet manager
******************************************************************************/
@@ -272,15 +189,20 @@ error_out:
void packet_manager_free(struct packet_manager *pkt_mgr)
{
+ struct packet_manager_runtime *pkt_mgr_rt = NULL;
+
if (pkt_mgr)
{
if (pkt_mgr->cfg)
{
for (uint16_t i = 0; i < pkt_mgr->cfg->nr_worker_thread; i++)
{
- if (pkt_mgr->runtime[i])
+ pkt_mgr_rt = pkt_mgr->runtime[i];
+ if (pkt_mgr_rt)
{
- packet_manager_runtime_free(pkt_mgr->runtime[i]);
+ PACKET_MANAGER_LOG_INFO("runtime: %p, idx: %d, will be cleaned", pkt_mgr_rt, i);
+ packet_manager_runtime_print_stat(pkt_mgr_rt);
+ packet_manager_runtime_free(pkt_mgr_rt);
}
}
}
@@ -310,7 +232,7 @@ void packet_manager_init(struct packet_manager *pkt_mgr, uint16_t thread_id, str
assert(mq_rt);
struct packet_manager_runtime *runtime = pkt_mgr->runtime[thread_id];
- runtime->mq = mq_rt;
+ packet_manager_runtime_init(runtime, mq_rt);
}
void packet_manager_ingress(struct packet_manager *pkt_mgr, uint16_t thread_id, struct packet *pkt)
@@ -319,22 +241,15 @@ void packet_manager_ingress(struct packet_manager *pkt_mgr, uint16_t thread_id,
struct exdata_runtime *exdata_rt = exdata_runtime_new(pkt_mgr->schema->exdata);
packet_set_user_data(pkt, exdata_rt);
- runtime->stat.total.pkts_ingress++;
- runtime->stat.queue[PACKET_STAGE_PREROUTING].pkts_in++;
- TAILQ_INSERT_TAIL(&runtime->queue[PACKET_STAGE_PREROUTING], pkt, stage_tqe);
+ packet_manager_runtime_ingress(runtime, pkt);
}
struct packet *packet_manager_egress(struct packet_manager *pkt_mgr, uint16_t thread_id)
{
struct packet_manager_runtime *runtime = pkt_mgr->runtime[thread_id];
-
- struct packet *pkt = TAILQ_FIRST(&runtime->queue[PACKET_STAGE_MAX]);
+ struct packet *pkt = packet_manager_runtime_egress(runtime);
if (pkt)
{
- runtime->stat.total.pkts_egress++;
- runtime->stat.queue[PACKET_STAGE_MAX].pkts_out++;
- TAILQ_REMOVE(&runtime->queue[PACKET_STAGE_MAX], pkt, stage_tqe);
-
struct exdata_runtime *exdata_rt = packet_get_user_data(pkt);
exdata_runtime_free(exdata_rt);
}
@@ -344,101 +259,31 @@ struct packet *packet_manager_egress(struct packet_manager *pkt_mgr, uint16_t th
void packet_manager_dispatch(struct packet_manager *pkt_mgr, uint16_t thread_id)
{
struct packet_manager_runtime *runtime = pkt_mgr->runtime[thread_id];
-
- for (int i = 0; i < PACKET_STAGE_MAX; i++)
- {
- runtime->stage = i;
- packet_manager_print_stat(pkt_mgr, thread_id);
-
- struct packet *pkt = NULL;
- while ((pkt = TAILQ_FIRST(&runtime->queue[runtime->stage])))
- {
- packet_set_claim(pkt, false);
- runtime->claimed_cb = NULL;
- runtime->cb_args = NULL;
-
- TAILQ_REMOVE(&runtime->queue[runtime->stage], pkt, stage_tqe);
- runtime->stat.queue[runtime->stage].pkts_out++;
-
- mq_runtime_publish_message(runtime->mq, runtime->stage, pkt);
- mq_runtime_dispatch(runtime->mq);
-
- if (packet_is_claim(pkt))
- {
- if (runtime->claimed_cb)
- {
- runtime->claimed_cb(pkt, runtime->cb_args);
- }
- packet_set_claim(pkt, false);
- continue;
- }
-
- TAILQ_INSERT_TAIL(&runtime->queue[runtime->stage + 1], pkt, stage_tqe);
- runtime->stat.queue[runtime->stage + 1].pkts_in++;
- }
- }
- runtime->stage = -1;
+ packet_manager_runtime_dispatch(runtime);
}
int packet_manager_claim_packet(struct packet_manager *pkt_mgr, uint16_t thread_id, struct packet *pkt, on_packet_claimed_callback cb, void *args)
{
struct packet_manager_runtime *runtime = pkt_mgr->runtime[thread_id];
-
- if (packet_is_claim(pkt))
- {
- PACKET_MANAGER_LOG_ERROR("packet is already claimed, cannot claim again");
- return -1;
- }
- else
- {
- runtime->claimed_cb = cb;
- runtime->cb_args = args;
- packet_set_claim(pkt, true);
- runtime->stat.queue[runtime->stage].pkts_claim++;
- return 0;
- }
+ return packet_manager_runtime_claim_packet(runtime, pkt, cb, args);
}
void packet_manager_schedule_packet(struct packet_manager *pkt_mgr, uint16_t thread_id, struct packet *pkt, enum packet_stage stage)
{
struct packet_manager_runtime *runtime = pkt_mgr->runtime[thread_id];
-
- if (stage >= PACKET_STAGE_MAX)
- {
- PACKET_MANAGER_LOG_ERROR("invalid stage %d", stage);
- assert(0);
- return;
- }
-
- runtime->stat.queue[stage].pkts_schedule++;
- runtime->stat.queue[stage].pkts_in++;
- TAILQ_INSERT_TAIL(&runtime->queue[stage], pkt, stage_tqe);
+ packet_manager_runtime_schedule_packet(runtime, pkt, stage);
}
struct packet_manager_stat *packet_manager_get_stat(struct packet_manager *pkt_mgr, uint16_t thread_id)
{
struct packet_manager_runtime *runtime = pkt_mgr->runtime[thread_id];
-
- return &runtime->stat;
+ return packet_manager_runtime_get_stat(runtime);
}
void packet_manager_print_stat(struct packet_manager *pkt_mgr, uint16_t thread_id)
{
struct packet_manager_runtime *runtime = pkt_mgr->runtime[thread_id];
-
- PACKET_MANAGER_LOG_DEBUG("runtime[%p] current stage: %s, pkts_ingress: %lu, pkts_egress: %lu",
- runtime, packet_stage_to_str(runtime->stage),
- runtime->stat.total.pkts_ingress, runtime->stat.total.pkts_egress);
- for (int i = 0; i < PACKET_QUEUE_MAX; i++)
- {
- PACKET_MANAGER_LOG_DEBUG("runtime[%p] (%11s) queue stat => pkts_in: %lu, pkts_out: %lu, pkts_claim: %lu, pkts_schedule: %lu",
- runtime,
- packet_stage_to_str(i),
- runtime->stat.queue[i].pkts_in,
- runtime->stat.queue[i].pkts_out,
- runtime->stat.queue[i].pkts_claim,
- runtime->stat.queue[i].pkts_schedule);
- }
+ packet_manager_runtime_print_stat(runtime);
}
/******************************************************************************
diff --git a/infra/packet_manager/packet_manager_internal.h b/infra/packet_manager/packet_manager_internal.h
index 13bdc3b..5264d4b 100644
--- a/infra/packet_manager/packet_manager_internal.h
+++ b/infra/packet_manager/packet_manager_internal.h
@@ -8,24 +8,6 @@ extern "C"
#include "stellar/mq.h"
#include "stellar/packet_manager.h"
-#define PACKET_QUEUE_MAX (PACKET_STAGE_MAX + 1)
-
-struct packet_manager_stat
-{
- struct
- {
- uint64_t pkts_ingress;
- uint64_t pkts_egress;
- } total;
- struct
- {
- uint64_t pkts_in; // include the packets that are scheduled
- uint64_t pkts_out; // include the packets that are claimed
- uint64_t pkts_claim;
- uint64_t pkts_schedule;
- } queue[PACKET_QUEUE_MAX]; // the last queue is for sending packets
-};
-
struct packet_manager *packet_manager_new(struct mq_schema *mq_schema, const char *toml_file);
void packet_manager_free(struct packet_manager *pkt_mgr);
@@ -33,11 +15,10 @@ void packet_manager_init(struct packet_manager *pkt_mgr, uint16_t thread_id, str
void packet_manager_ingress(struct packet_manager *pkt_mgr, uint16_t thread_id, struct packet *pkt);
struct packet *packet_manager_egress(struct packet_manager *pkt_mgr, uint16_t thread_id);
void packet_manager_dispatch(struct packet_manager *pkt_mgr, uint16_t thread_id);
+
struct packet_manager_stat *packet_manager_get_stat(struct packet_manager *pkt_mgr, uint16_t thread_id);
void packet_manager_print_stat(struct packet_manager *pkt_mgr, uint16_t thread_id);
-const char *packet_stage_to_str(enum packet_stage stage);
-
#ifdef __cplusplus
}
#endif
diff --git a/infra/packet_manager/packet_manager_runtime.c b/infra/packet_manager/packet_manager_runtime.c
new file mode 100644
index 0000000..c7e6dcb
--- /dev/null
+++ b/infra/packet_manager/packet_manager_runtime.c
@@ -0,0 +1,173 @@
+#include <assert.h>
+#include <stdlib.h>
+
+#include "packet_internal.h"
+#include "packet_manager_runtime.h"
+
+const char *packet_stage_to_str(enum packet_stage stage)
+{
+ switch (stage)
+ {
+ case PACKET_STAGE_PREROUTING:
+ return "PACKET_STAGE_PREROUTING";
+ case PACKET_STAGE_INPUT:
+ return "PACKET_STAGE_INPUT";
+ case PACKET_STAGE_FORWARD:
+ return "PACKET_STAGE_FORWARD";
+ case PACKET_STAGE_OUTPUT:
+ return "PACKET_STAGE_OUTPUT";
+ case PACKET_STAGE_POSTROUTING:
+ return "PACKET_STAGE_POSTROUTING";
+ default:
+ return "PACKET_STAGE_UNKNOWN";
+ }
+}
+
+struct packet_manager_runtime *packet_manager_runtime_new()
+{
+ struct packet_manager_runtime *pkt_mgr_rt = calloc(1, sizeof(struct packet_manager_runtime));
+ if (pkt_mgr_rt == NULL)
+ {
+ PACKET_MANAGER_LOG_ERROR("failed to allocate memory for packet_manager_runtime");
+ return NULL;
+ }
+
+ for (int i = 0; i < PACKET_QUEUE_MAX; i++)
+ {
+ TAILQ_INIT(&pkt_mgr_rt->queue[i]);
+ }
+
+ return pkt_mgr_rt;
+}
+
+void packet_manager_runtime_free(struct packet_manager_runtime *pkt_mgr_rt)
+{
+ if (pkt_mgr_rt)
+ {
+ for (int i = 0; i < PACKET_QUEUE_MAX; i++)
+ {
+ struct packet *pkt = NULL;
+ while ((pkt = TAILQ_FIRST(&pkt_mgr_rt->queue[i])))
+ {
+ TAILQ_REMOVE(&pkt_mgr_rt->queue[i], pkt, stage_tqe);
+
+ // TODO: free packet and free mbuff
+ packet_free(pkt);
+ }
+ }
+ }
+ free(pkt_mgr_rt);
+ pkt_mgr_rt = NULL;
+}
+
+void packet_manager_runtime_init(struct packet_manager_runtime *pkt_mgr_rt, struct mq_runtime *mq_rt)
+{
+ pkt_mgr_rt->mq = mq_rt;
+}
+
+void packet_manager_runtime_ingress(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt)
+{
+ pkt_mgr_rt->stat.total.pkts_ingress++;
+ pkt_mgr_rt->stat.queue[PACKET_STAGE_PREROUTING].pkts_in++;
+ TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[PACKET_STAGE_PREROUTING], pkt, stage_tqe);
+}
+
+struct packet *packet_manager_runtime_egress(struct packet_manager_runtime *pkt_mgr_rt)
+{
+ struct packet *pkt = TAILQ_FIRST(&pkt_mgr_rt->queue[PACKET_STAGE_MAX]);
+ if (pkt)
+ {
+ pkt_mgr_rt->stat.total.pkts_egress++;
+ pkt_mgr_rt->stat.queue[PACKET_STAGE_MAX].pkts_out++;
+ TAILQ_REMOVE(&pkt_mgr_rt->queue[PACKET_STAGE_MAX], pkt, stage_tqe);
+ }
+ return pkt;
+}
+
+void packet_manager_runtime_dispatch(struct packet_manager_runtime *pkt_mgr_rt)
+{
+ for (int i = 0; i < PACKET_STAGE_MAX; i++)
+ {
+ pkt_mgr_rt->stage = i;
+
+ struct packet *pkt = NULL;
+ while ((pkt = TAILQ_FIRST(&pkt_mgr_rt->queue[pkt_mgr_rt->stage])))
+ {
+ packet_set_claim(pkt, false);
+ pkt_mgr_rt->claimed_cb = NULL;
+ pkt_mgr_rt->cb_args = NULL;
+
+ TAILQ_REMOVE(&pkt_mgr_rt->queue[pkt_mgr_rt->stage], pkt, stage_tqe);
+ pkt_mgr_rt->stat.queue[pkt_mgr_rt->stage].pkts_out++;
+
+ mq_runtime_publish_message(pkt_mgr_rt->mq, pkt_mgr_rt->stage, pkt);
+ mq_runtime_dispatch(pkt_mgr_rt->mq);
+
+ if (packet_is_claim(pkt))
+ {
+ if (pkt_mgr_rt->claimed_cb)
+ {
+ pkt_mgr_rt->claimed_cb(pkt, pkt_mgr_rt->cb_args);
+ }
+ packet_set_claim(pkt, false);
+ continue;
+ }
+
+ TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[pkt_mgr_rt->stage + 1], pkt, stage_tqe);
+ pkt_mgr_rt->stat.queue[pkt_mgr_rt->stage + 1].pkts_in++;
+ }
+ }
+ pkt_mgr_rt->stage = -1;
+}
+
+int packet_manager_runtime_claim_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt, on_packet_claimed_callback cb, void *args)
+{
+ if (packet_is_claim(pkt))
+ {
+ PACKET_MANAGER_LOG_ERROR("packet is already claimed, cannot claim again");
+ return -1;
+ }
+ else
+ {
+ pkt_mgr_rt->claimed_cb = cb;
+ pkt_mgr_rt->cb_args = args;
+ packet_set_claim(pkt, true);
+ pkt_mgr_rt->stat.queue[pkt_mgr_rt->stage].pkts_claim++;
+ return 0;
+ }
+}
+
+void packet_manager_runtime_schedule_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt, enum packet_stage stage)
+{
+ if (stage >= PACKET_STAGE_MAX)
+ {
+ PACKET_MANAGER_LOG_ERROR("invalid stage %d", stage);
+ assert(0);
+ return;
+ }
+
+ pkt_mgr_rt->stat.queue[stage].pkts_schedule++;
+ pkt_mgr_rt->stat.queue[stage].pkts_in++;
+ TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[stage], pkt, stage_tqe);
+}
+
+struct packet_manager_stat *packet_manager_runtime_get_stat(struct packet_manager_runtime *pkt_mgr_rt)
+{
+ return &pkt_mgr_rt->stat;
+}
+
+void packet_manager_runtime_print_stat(struct packet_manager_runtime *pkt_mgr_rt)
+{
+ PACKET_MANAGER_LOG_INFO("runtime: %p, pkts_ingress: %lu, pkts_egress: %lu",
+ pkt_mgr_rt, pkt_mgr_rt->stat.total.pkts_ingress, pkt_mgr_rt->stat.total.pkts_egress);
+ for (int i = 0; i < PACKET_QUEUE_MAX; i++)
+ {
+ PACKET_MANAGER_LOG_INFO("runtime: %p, %-24s stat => pkts_in: %lu, pkts_out: %lu, pkts_claim: %lu, pkts_schedule: %lu",
+ pkt_mgr_rt,
+ packet_stage_to_str(i),
+ pkt_mgr_rt->stat.queue[i].pkts_in,
+ pkt_mgr_rt->stat.queue[i].pkts_out,
+ pkt_mgr_rt->stat.queue[i].pkts_claim,
+ pkt_mgr_rt->stat.queue[i].pkts_schedule);
+ }
+}
diff --git a/infra/packet_manager/packet_manager_runtime.h b/infra/packet_manager/packet_manager_runtime.h
new file mode 100644
index 0000000..35a7307
--- /dev/null
+++ b/infra/packet_manager/packet_manager_runtime.h
@@ -0,0 +1,69 @@
+#pragma once
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+#include <sys/queue.h>
+
+#include "log_internal.h"
+#include "stellar/mq.h"
+#include "stellar/packet_manager.h"
+
+#define PACKET_MANAGER_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "packet manager", format, ##__VA_ARGS__)
+#define PACKET_MANAGER_LOG_DEBUG(format, ...) STELLAR_LOG_DEBUG(__thread_local_logger, "packet manager", format, ##__VA_ARGS__)
+#define PACKET_MANAGER_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "packet manager", format, ##__VA_ARGS__)
+#define PACKET_MANAGER_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "packet manager", format, ##__VA_ARGS__)
+
+#define PACKET_QUEUE_MAX (PACKET_STAGE_MAX + 1)
+
+TAILQ_HEAD(packet_queue, packet);
+
+struct packet_manager_stat
+{
+ struct
+ {
+ uint64_t pkts_ingress;
+ uint64_t pkts_egress;
+ } total;
+ struct
+ {
+ uint64_t pkts_in; // include the packets that are scheduled
+ uint64_t pkts_out; // include the packets that are claimed
+ uint64_t pkts_claim;
+ uint64_t pkts_schedule;
+ } queue[PACKET_QUEUE_MAX]; // the last queue is for sending packets
+};
+
+struct packet_manager_runtime
+{
+ enum packet_stage stage;
+ struct packet_queue queue[PACKET_QUEUE_MAX];
+
+ void *cb_args;
+ on_packet_claimed_callback *claimed_cb;
+
+ struct mq_runtime *mq;
+ struct packet_manager_stat stat;
+};
+
+struct packet_manager_runtime *packet_manager_runtime_new();
+void packet_manager_runtime_free(struct packet_manager_runtime *pkt_mgr_rt);
+
+void packet_manager_runtime_init(struct packet_manager_runtime *pkt_mgr_rt, struct mq_runtime *mq_rt);
+void packet_manager_runtime_ingress(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt);
+struct packet *packet_manager_runtime_egress(struct packet_manager_runtime *pkt_mgr_rt);
+void packet_manager_runtime_dispatch(struct packet_manager_runtime *pkt_mgr_rt);
+
+int packet_manager_runtime_claim_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt, on_packet_claimed_callback cb, void *args);
+void packet_manager_runtime_schedule_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt, enum packet_stage stage);
+
+struct packet_manager_stat *packet_manager_runtime_get_stat(struct packet_manager_runtime *pkt_mgr_rt);
+void packet_manager_runtime_print_stat(struct packet_manager_runtime *pkt_mgr_rt);
+
+const char *packet_stage_to_str(enum packet_stage stage);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/infra/packet_manager/test/gtest_packet_manager.cpp b/infra/packet_manager/test/gtest_packet_manager.cpp
index a3674c6..7f97748 100644
--- a/infra/packet_manager/test/gtest_packet_manager.cpp
+++ b/infra/packet_manager/test/gtest_packet_manager.cpp
@@ -1,8 +1,8 @@
#include <gtest/gtest.h>
-#include "stellar/mq.h"
#include "packet_parser.h"
#include "packet_internal.h"
+#include "packet_manager_runtime.h"
#include "packet_manager_internal.h"
/******************************************************************************
diff --git a/infra/session_manager/session_manager.c b/infra/session_manager/session_manager.c
index d150dc9..a2b2fd7 100644
--- a/infra/session_manager/session_manager.c
+++ b/infra/session_manager/session_manager.c
@@ -305,6 +305,7 @@ void session_manager_free(struct session_manager *sess_mgr)
clean_session(sess_mgr_rt, UINT64_MAX);
}
+ SESSION_MANAGER_LOG_INFO("runtime: %p, idx: %d, will be cleaned", sess_mgr_rt, i);
session_manager_runtime_print_stat(sess_mgr_rt);
session_manager_runtime_free(sess_mgr->runtime[i]);
}
diff --git a/infra/session_manager/session_manager_runtime.c b/infra/session_manager/session_manager_runtime.c
index 14a4ed3..f89176c 100644
--- a/infra/session_manager/session_manager_runtime.c
+++ b/infra/session_manager/session_manager_runtime.c
@@ -1292,22 +1292,22 @@ void session_manager_runtime_print_stat(struct session_manager_runtime *sess_mgr
struct session_manager_stat *stat = &sess_mgr_rt->stat;
// TCP session
- SESSION_MANAGER_LOG_INFO("runtime[%p] => TCP session: history=%lu, used=%lu, opening=%lu, active=%lu, closing=%lu, discard=%lu, closed=%lu",
+ SESSION_MANAGER_LOG_INFO("runtime: %p, TCP session stat => history: %lu, used: %lu, opening: %lu, active: %lu, closing: %lu, discard: %lu, closed: %lu",
sess_mgr_rt, stat->history_tcp_sessions, stat->tcp_sess_used, stat->tcp_sess_opening, stat->tcp_sess_active,
stat->tcp_sess_closing, stat->tcp_sess_discard, stat->tcp_sess_closed);
// UDP session
- SESSION_MANAGER_LOG_INFO("runtime[%p] => UDP session: history=%lu, used=%lu, opening=%lu, active=%lu, closing=%lu, discard=%lu, closed=%lu",
+ SESSION_MANAGER_LOG_INFO("runtime: %p, UDP session stat => history: %lu, used: %lu, opening: %lu, active: %lu, closing: %lu, discard: %lu, closed: %lu",
sess_mgr_rt, stat->history_udp_sessions, stat->udp_sess_used, stat->udp_sess_opening, stat->udp_sess_active,
stat->udp_sess_closing, stat->udp_sess_discard, stat->udp_sess_closed);
// evicted session
- SESSION_MANAGER_LOG_INFO("runtime[%p] => evicted session: TCP=%lu, UDP=%lu", sess_mgr_rt, stat->tcp_sess_evicted, stat->udp_sess_evicted);
+ SESSION_MANAGER_LOG_INFO("runtime: %p, evicted session stat => TCP: %lu, UDP: %lu", sess_mgr_rt, stat->tcp_sess_evicted, stat->udp_sess_evicted);
// Bypassed packet
- SESSION_MANAGER_LOG_INFO("runtime[%p] => bypassed TCP packet: table_full=%lu, session_not_found=%lu, duplicated=%lu",
+ SESSION_MANAGER_LOG_INFO("runtime: %p, bypassed TCP packet stat => table_full: %lu, session_not_found: %lu, duplicated: %lu",
sess_mgr_rt, stat->tcp_pkts_bypass_table_full, stat->tcp_pkts_bypass_session_not_found, stat->tcp_pkts_bypass_duplicated);
- SESSION_MANAGER_LOG_INFO("runtime[%p] => bypassed UDP packet: table_full=%lu, session_evicted=%lu, duplicated=%lu",
+ SESSION_MANAGER_LOG_INFO("runtime: %p, bypassed UDP packet stat => table_full: %lu, session_evicted: %lu, duplicated: %lu",
sess_mgr_rt, stat->udp_pkts_bypass_table_full, stat->udp_pkts_bypass_session_evicted, stat->udp_pkts_bypass_duplicated);
// TCP segment
- SESSION_MANAGER_LOG_INFO("runtime[%p] => TCP segment: input=%lu, consumed=%lu, timeout=%lu, retransmited=%lu, overlapped=%lu, omitted_too_many=%lu, inorder=%lu, reordered=%lu, buffered=%lu, freed=%lu",
+ SESSION_MANAGER_LOG_INFO("runtime: %p, TCP segment stat => input: %lu, consumed: %lu, timeout: %lu, retransmited: %lu, overlapped: %lu, omitted_too_many: %lu, inorder: %lu, reordered: %lu, buffered: %lu, freed: %lu",
sess_mgr_rt, stat->tcp_segs_input, stat->tcp_segs_consumed, stat->tcp_segs_timeout, stat->tcp_segs_retransmited,
stat->tcp_segs_overlapped, stat->tcp_segs_omitted_too_many, stat->tcp_segs_inorder, stat->tcp_segs_reordered,
stat->tcp_segs_buffered, stat->tcp_segs_freed);
@@ -1425,7 +1425,7 @@ uint64_t session_manager_runtime_scan(const struct session_manager_runtime *sess
}
}
- SESSION_MANAGER_LOG_DEBUG("session scan: cursor=%lu, count=%lu, mached_sess_num=%lu", opts->cursor, opts->count, mached_sess_num);
+ SESSION_MANAGER_LOG_DEBUG("session scan => cursor: %lu, count: %lu, mached_sess_num: %lu", opts->cursor, opts->count, mached_sess_num);
return mached_sess_num;
}
diff --git a/infra/stellar_core.c b/infra/stellar_core.c
index 0d85a7e..a63a5bd 100644
--- a/infra/stellar_core.c
+++ b/infra/stellar_core.c
@@ -147,7 +147,6 @@ static int stellar_thread_run(struct stellar *st)
static void stellar_thread_join(struct stellar *st)
{
- CORE_LOG_FATAL("waiting worker thread exit");
for (uint16_t i = 0; i < st->thread_num; i++)
{
if (st->threads[i].is_runing == 0)
@@ -158,7 +157,6 @@ static void stellar_thread_join(struct stellar *st)
struct stellar_thread *thread = &st->threads[i];
pthread_join(thread->tid, NULL);
}
- CORE_LOG_FATAL("all worker thread exited");
}
struct stellar *stellar_new(const char *toml_file)
@@ -241,7 +239,6 @@ void stellar_run(struct stellar *st)
if (packet_io_isbreak(st->pkt_io))
{
ATOMIC_SET(&st->need_exit, 1);
- CORE_LOG_FATAL("notify worker thread to exit");
break;
}
}
diff --git a/test/session_debugger/session_debugger.c b/test/session_debugger/session_debugger.c
index a41610c..b54d739 100644
--- a/test/session_debugger/session_debugger.c
+++ b/test/session_debugger/session_debugger.c
@@ -136,7 +136,7 @@ static void session_debugger_exdata_free(struct session_debugger_exdata *exdata)
static void session_debugger_exdata_free_callback(int idx, void *ex_ptr, void *arg)
{
- struct session_debugger *dbg = (struct session_debugger *)arg;
+ __attribute__((unused)) struct session_debugger *dbg = (struct session_debugger *)arg;
assert(idx == dbg->sess_exdata_idx);
session_debugger_exdata_free((struct session_debugger_exdata *)ex_ptr);
@@ -244,7 +244,7 @@ static struct session_debugger *session_debugger_new(struct session_manager *ses
goto error_out;
}
- dbg->sess_exdata_idx = session_manager_new_session_exdata_index(dbg->sess_mgr, "DEBUG_MODULE_SESS_EXDATA", session_debugger_exdata_free_callback, dbg);
+ dbg->sess_exdata_idx = session_manager_new_session_exdata_index(dbg->sess_mgr, "session_debugger_exdata", session_debugger_exdata_free_callback, dbg);
if (dbg->sess_exdata_idx == -1)
{
session_debugger_log(STDERR_FILENO, "new session exdata index failed\n");