summaryrefslogtreecommitdiff
path: root/infra/packet_manager
diff options
context:
space:
mode:
authorluwenpeng <[email protected]>2024-09-14 18:38:37 +0800
committerluwenpeng <[email protected]>2024-09-18 14:36:31 +0800
commit721d5d1466541cc54a991fc5359cd7013e10f936 (patch)
tree0c0ea2064c7540ad9c4b8d096b88751ed3861e60 /infra/packet_manager
parentf559d67b93df78e9f6d5c3fe301b688b5c857d98 (diff)
feature(packet manager): support claim packt and add test case
Diffstat (limited to 'infra/packet_manager')
-rw-r--r--infra/packet_manager/packet_manager.c105
-rw-r--r--infra/packet_manager/packet_manager_private.h20
-rw-r--r--infra/packet_manager/packet_private.h6
-rw-r--r--infra/packet_manager/packet_utils.c24
-rw-r--r--infra/packet_manager/test/gtest_packet_manager.cpp199
5 files changed, 288 insertions, 66 deletions
diff --git a/infra/packet_manager/packet_manager.c b/infra/packet_manager/packet_manager.c
index b13802f..4cb92ca 100644
--- a/infra/packet_manager/packet_manager.c
+++ b/infra/packet_manager/packet_manager.c
@@ -16,17 +16,6 @@ struct packet_manager_config
uint16_t nr_worker_thread;
};
-struct packet_manager_stat
-{
- uint64_t input_pkts;
- uint64_t output_pkts;
-
- uint64_t take_pkts;
- uint64_t schedule_pkts;
-
- uint64_t curr_queue_len[PACKET_STAGE_MAX];
-};
-
struct packet_manager_schema
{
struct mq_schema *mq;
@@ -36,9 +25,11 @@ struct packet_manager_schema
struct packet_manager_runtime
{
uint16_t idx;
+ void *cb_args;
+ on_packet_claimed_callback *claimed_cb;
struct mq_runtime *mq;
- struct packet_queue queue[PACKET_STAGE_MAX];
- struct packet_manager_stat stat;
+ struct packet_queue queue[QUEUE_NUM_MAX];
+ struct packet_manager_runtime_stat stat;
};
struct packet_manager
@@ -192,7 +183,7 @@ static void packet_manager_runtime_free(struct packet_manager_runtime *runtime)
{
if (runtime)
{
- for (int i = 0; i < PACKET_STAGE_MAX; i++)
+ for (int i = 0; i < QUEUE_NUM_MAX; i++)
{
struct packet *pkt = NULL;
while ((pkt = TAILQ_FIRST(&runtime->queue[i])))
@@ -219,7 +210,7 @@ static struct packet_manager_runtime *packet_manager_runtime_new(uint16_t idx)
runtime->idx = idx;
- for (int i = 0; i < PACKET_STAGE_MAX; i++)
+ for (int i = 0; i < QUEUE_NUM_MAX; i++)
{
TAILQ_INIT(&runtime->queue[i]);
}
@@ -227,17 +218,22 @@ static struct packet_manager_runtime *packet_manager_runtime_new(uint16_t idx)
return runtime;
}
-static void packet_manager_runtime_stat(struct packet_manager_runtime *runtime)
+void packet_manager_runtime_print_stat(struct packet_manager_runtime *runtime)
{
- PACKET_MANAGER_LOG_DEBUG("runtime[%d] => input_pkts: %lu, output_pkts: %lu, take_pkts: %lu, schedule_pkts: %lu, queue_len: {pre_routing: %lu, input: %lu, forward: %lu, output: %lu, post_routing: %lu}",
+ PACKET_MANAGER_LOG_DEBUG("runtime[%d] => input_pkts: %lu, output_pkts: %lu, claim_pkts: %lu, schedule_pkts: %lu, queue_len: {pre_routing: %lu, input: %lu, forward: %lu, output: %lu, post_routing: %lu}",
runtime->idx,
runtime->stat.input_pkts, runtime->stat.output_pkts,
- runtime->stat.take_pkts, runtime->stat.schedule_pkts,
- runtime->stat.curr_queue_len[PACKET_STAGE_PREROUTING],
- runtime->stat.curr_queue_len[PACKET_STAGE_INPUT],
- runtime->stat.curr_queue_len[PACKET_STAGE_FORWARD],
- runtime->stat.curr_queue_len[PACKET_STAGE_OUTPUT],
- runtime->stat.curr_queue_len[PACKET_STAGE_POSTROUTING]);
+ runtime->stat.claim_pkts, runtime->stat.schedule_pkts,
+ runtime->stat.queue_len[PACKET_STAGE_PREROUTING],
+ runtime->stat.queue_len[PACKET_STAGE_INPUT],
+ runtime->stat.queue_len[PACKET_STAGE_FORWARD],
+ runtime->stat.queue_len[PACKET_STAGE_OUTPUT],
+ runtime->stat.queue_len[PACKET_STAGE_POSTROUTING]);
+}
+
+struct packet_manager_runtime_stat *packet_manager_runtime_get_stat(struct packet_manager_runtime *runtime)
+{
+ return &runtime->stat;
}
void packet_manager_runtime_init(struct packet_manager_runtime *pkt_mgr_rt, struct mq_runtime *mq_rt)
@@ -248,18 +244,18 @@ void packet_manager_runtime_init(struct packet_manager_runtime *pkt_mgr_rt, stru
void packet_manager_runtime_input(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt)
{
pkt_mgr_rt->stat.input_pkts++;
- pkt_mgr_rt->stat.curr_queue_len[PACKET_STAGE_PREROUTING]++;
+ pkt_mgr_rt->stat.queue_len[PACKET_STAGE_PREROUTING]++;
TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[PACKET_STAGE_PREROUTING], pkt, stage_tqe);
}
struct packet *packet_manager_runtime_output(struct packet_manager_runtime *pkt_mgr_rt)
{
- struct packet *pkt = TAILQ_FIRST(&pkt_mgr_rt->queue[PACKET_STAGE_POSTROUTING]);
+ struct packet *pkt = TAILQ_FIRST(&pkt_mgr_rt->queue[QUEUE_NUM_MAX - 1]);
if (pkt)
{
pkt_mgr_rt->stat.output_pkts++;
- pkt_mgr_rt->stat.curr_queue_len[PACKET_STAGE_POSTROUTING]--;
- TAILQ_REMOVE(&pkt_mgr_rt->queue[PACKET_STAGE_POSTROUTING], pkt, stage_tqe);
+ pkt_mgr_rt->stat.queue_len[QUEUE_NUM_MAX - 1]--;
+ TAILQ_REMOVE(&pkt_mgr_rt->queue[QUEUE_NUM_MAX - 1], pkt, stage_tqe);
}
return pkt;
}
@@ -268,47 +264,64 @@ void packet_manager_runtime_dispatch(struct packet_manager_runtime *pkt_mgr_rt)
{
for (int i = 0; i < PACKET_STAGE_MAX; i++)
{
- // packet_manager_runtime_stat(pkt_mgr_rt);
+ packet_manager_runtime_print_stat(pkt_mgr_rt);
struct packet *pkt = NULL;
while ((pkt = TAILQ_FIRST(&pkt_mgr_rt->queue[i])))
{
+ packet_set_claim(pkt, false);
+ pkt_mgr_rt->claimed_cb = NULL;
+ pkt_mgr_rt->cb_args = NULL;
+
TAILQ_REMOVE(&pkt_mgr_rt->queue[i], pkt, stage_tqe);
- pkt_mgr_rt->stat.curr_queue_len[i]--;
+ pkt_mgr_rt->stat.queue_len[i]--;
mq_runtime_publish_message(pkt_mgr_rt->mq, i, pkt);
mq_runtime_dispatch(pkt_mgr_rt->mq);
- if (packet_is_stolen(pkt))
+ if (packet_is_claim(pkt))
{
+ if (pkt_mgr_rt->claimed_cb)
+ {
+ pkt_mgr_rt->claimed_cb(pkt, pkt_mgr_rt->cb_args);
+ }
continue;
}
- if (i + 1 == PACKET_STAGE_MAX)
- {
- TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[i], pkt, stage_tqe);
- pkt_mgr_rt->stat.curr_queue_len[i]++;
- break;
- }
- else
- {
- TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[i + 1], pkt, stage_tqe);
- pkt_mgr_rt->stat.curr_queue_len[i + 1]++;
- }
+ TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[i + 1], pkt, stage_tqe);
+ pkt_mgr_rt->stat.queue_len[i + 1]++;
}
}
}
-void packet_manager_runtime_take_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt)
+int packet_manager_runtime_claim_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt, on_packet_claimed_callback cb, void *args)
{
- pkt_mgr_rt->stat.take_pkts++;
- packet_set_stolen(pkt, true);
+ if (packet_is_claim(pkt))
+ {
+ PACKET_MANAGER_LOG_ERROR("packet is already claimed, cannot claim again");
+ return -1;
+ }
+ else
+ {
+ pkt_mgr_rt->claimed_cb = cb;
+ pkt_mgr_rt->cb_args = args;
+ packet_set_claim(pkt, true);
+ pkt_mgr_rt->stat.claim_pkts++;
+ return 0;
+ }
}
void packet_manager_runtime_schedule_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt, enum packet_stage stage)
{
+ if (stage >= PACKET_STAGE_MAX)
+ {
+ PACKET_MANAGER_LOG_ERROR("invalid stage %d", stage);
+ assert(0);
+ return;
+ }
+
pkt_mgr_rt->stat.schedule_pkts++;
- packet_set_stolen(pkt, false);
+ pkt_mgr_rt->stat.queue_len[stage]++;
TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[stage], pkt, stage_tqe);
}
@@ -364,7 +377,7 @@ void packet_manager_free(struct packet_manager *pkt_mgr)
{
if (pkt_mgr->runtime[i])
{
- packet_manager_runtime_stat(pkt_mgr->runtime[i]);
+ packet_manager_runtime_print_stat(pkt_mgr->runtime[i]);
packet_manager_runtime_free(pkt_mgr->runtime[i]);
}
}
diff --git a/infra/packet_manager/packet_manager_private.h b/infra/packet_manager/packet_manager_private.h
index 3f0dcf3..a24fa7b 100644
--- a/infra/packet_manager/packet_manager_private.h
+++ b/infra/packet_manager/packet_manager_private.h
@@ -14,8 +14,26 @@ void packet_manager_runtime_init(struct packet_manager_runtime *pkt_mgr_rt, stru
void packet_manager_runtime_input(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt);
struct packet *packet_manager_runtime_output(struct packet_manager_runtime *pkt_mgr_rt);
void packet_manager_runtime_dispatch(struct packet_manager_runtime *pkt_mgr_rt);
-// for debug
+
+/******************************************************************************
+ * for gtest
+ ******************************************************************************/
+
+#define QUEUE_NUM_MAX (PACKET_STAGE_MAX + 1) // the last queue is for egress
+struct packet_manager_runtime_stat
+{
+ uint64_t input_pkts;
+ uint64_t output_pkts;
+
+ uint64_t claim_pkts;
+ uint64_t schedule_pkts;
+
+ uint64_t queue_len[QUEUE_NUM_MAX];
+};
+
const char *packet_stage_to_str(enum packet_stage stage);
+void packet_manager_runtime_print_stat(struct packet_manager_runtime *runtime);
+struct packet_manager_runtime_stat *packet_manager_runtime_get_stat(struct packet_manager_runtime *runtime);
#ifdef __cplusplus
}
diff --git a/infra/packet_manager/packet_private.h b/infra/packet_manager/packet_private.h
index 915b763..e312e1b 100644
--- a/infra/packet_manager/packet_private.h
+++ b/infra/packet_manager/packet_private.h
@@ -29,7 +29,7 @@ struct metadata
uint64_t domain;
uint16_t link_id;
bool is_ctrl;
- bool is_stolen;
+ bool is_claim;
enum packet_direction direction;
enum packet_action action;
@@ -102,8 +102,8 @@ uint16_t packet_get_link_id(const struct packet *pkt);
void packet_set_ctrl(struct packet *pkt, bool ctrl);
bool packet_is_ctrl(const struct packet *pkt);
-void packet_set_stolen(struct packet *pkt, bool stolen);
-bool packet_is_stolen(const struct packet *pkt);
+void packet_set_claim(struct packet *pkt, bool claim);
+bool packet_is_claim(const struct packet *pkt);
void packet_set_direction(struct packet *pkt, enum packet_direction dir);
diff --git a/infra/packet_manager/packet_utils.c b/infra/packet_manager/packet_utils.c
index 3ec12f3..86cc00b 100644
--- a/infra/packet_manager/packet_utils.c
+++ b/infra/packet_manager/packet_utils.c
@@ -1,3 +1,5 @@
+#include <assert.h>
+
#include "tuple.h"
#include "uthash.h"
#include "log_private.h"
@@ -101,14 +103,14 @@ bool packet_is_ctrl(const struct packet *pkt)
return pkt->meta.is_ctrl;
}
-void packet_set_stolen(struct packet *pkt, bool stolen)
+void packet_set_claim(struct packet *pkt, bool claim)
{
- pkt->meta.is_stolen = stolen;
+ pkt->meta.is_claim = claim;
}
-bool packet_is_stolen(const struct packet *pkt)
+bool packet_is_claim(const struct packet *pkt)
{
- return pkt->meta.is_stolen;
+ return pkt->meta.is_claim;
}
void packet_set_direction(struct packet *pkt, enum packet_direction dir)
@@ -929,9 +931,19 @@ struct packet *packet_dup(const struct packet *pkt)
void packet_free(struct packet *pkt)
{
- if (pkt && pkt->need_free)
+ if (pkt)
{
- free((void *)pkt);
+ if (packet_is_claim(pkt))
+ {
+ PACKET_LOG_ERROR("packet has been claimed and cannot be released, please check the module arrangement order");
+ assert(0);
+ return;
+ }
+
+ if (pkt->need_free)
+ {
+ free((void *)pkt);
+ }
}
}
diff --git a/infra/packet_manager/test/gtest_packet_manager.cpp b/infra/packet_manager/test/gtest_packet_manager.cpp
index 2fee4a4..e53b69d 100644
--- a/infra/packet_manager/test/gtest_packet_manager.cpp
+++ b/infra/packet_manager/test/gtest_packet_manager.cpp
@@ -67,6 +67,19 @@ unsigned char data[] = {
0x81, 0x80, 0x5c, 0x76, 0x00, 0x00, 0x00, 0x00, 0x80, 0x02, 0x20, 0x00, 0xf7, 0x57, 0x00, 0x00, 0x02, 0x04, 0x04, 0xc4, 0x01, 0x03, 0x03, 0x08, 0x01, 0x01,
0x04, 0x02};
+static void check_stat(struct packet_manager_runtime_stat *stat, uint64_t input_pkts, uint64_t output_pkts, uint64_t claim_pkts, uint64_t schedule_pkts)
+{
+ EXPECT_TRUE(stat->input_pkts == input_pkts);
+ EXPECT_TRUE(stat->output_pkts == output_pkts);
+ EXPECT_TRUE(stat->claim_pkts == claim_pkts);
+ EXPECT_TRUE(stat->schedule_pkts == schedule_pkts);
+
+ for (int i = 0; i < PACKET_STAGE_MAX; i++)
+ {
+ EXPECT_TRUE(stat->queue_len[i] == 0);
+ }
+}
+
#if 1
TEST(PACKET_MANAGER, NEW_FREE)
{
@@ -86,20 +99,19 @@ TEST(PACKET_MANAGER, NEW_FREE)
}
#endif
+#if 1
static void on_packet_stage(enum packet_stage stage, struct packet *pkt, void *args)
{
- printf("stage: %s\n", packet_stage_to_str(stage));
+ printf("on_packet_stage: %s\n", packet_stage_to_str(stage));
static int count = 0;
EXPECT_TRUE(count == stage);
- count++;
-
EXPECT_TRUE(packet_is_ctrl(pkt));
EXPECT_TRUE(args == NULL);
+ count++;
}
-#if 1
-TEST(PACKET_MANAGER, NORNMAL)
+TEST(PACKET_MANAGER, SUBSCRIBER)
{
// global init
struct mq_schema *mq_schema = mq_schema_new();
@@ -129,9 +141,12 @@ TEST(PACKET_MANAGER, NORNMAL)
packet_parse(&pkt, (const char *)data, sizeof(data));
packet_set_ctrl(&pkt, true);
+ check_stat(packet_manager_runtime_get_stat(runtime), 0, 0, 0, 0);
packet_manager_runtime_input(runtime, &pkt);
packet_manager_runtime_dispatch(runtime);
EXPECT_TRUE(packet_manager_runtime_output(runtime) == &pkt);
+ EXPECT_TRUE(packet_manager_runtime_output(runtime) == NULL);
+ check_stat(packet_manager_runtime_get_stat(runtime), 1, 1, 0, 0);
// per-thread free
@@ -145,18 +160,182 @@ TEST(PACKET_MANAGER, NORNMAL)
#endif
#if 1
-TEST(PACKET_MANAGER, TAKE)
+static void packet_claimed(struct packet *pkt, void *args)
{
- // TODO
- // packet_manager_runtime_take_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt);
+ char *str = (char *)args;
+ EXPECT_STREQ(str, "hello");
+ printf("packet_claimed: with ctx %s\n", str);
+ EXPECT_TRUE(packet_is_ctrl(pkt));
+ EXPECT_TRUE(packet_is_claim(pkt));
+ free(str);
+}
+
+static void module_A_on_packet_stage(enum packet_stage stage, struct packet *pkt, void *args)
+{
+ struct packet_manager *pkt_mgr = (struct packet_manager *)args;
+ struct packet_manager_runtime *pkt_mgr_rt = packet_manager_get_runtime(pkt_mgr, 0);
+ EXPECT_TRUE(pkt_mgr_rt);
+
+ printf("module_A_on_packet_stage: %s claim packet success\n", packet_stage_to_str(stage));
+
+ static int count = 0;
+ EXPECT_TRUE(count == 0);
+ EXPECT_TRUE(stage == PACKET_STAGE_PREROUTING);
+ EXPECT_TRUE(packet_is_ctrl(pkt));
+ EXPECT_TRUE(!packet_is_claim(pkt));
+ packet_manager_runtime_claim_packet(pkt_mgr_rt, pkt, packet_claimed, strdup("hello"));
+ count++;
+}
+
+static void module_B_on_packet_stage(enum packet_stage stage, struct packet *pkt, void *args)
+{
+ struct packet_manager *pkt_mgr = (struct packet_manager *)args;
+ struct packet_manager_runtime *pkt_mgr_rt = packet_manager_get_runtime(pkt_mgr, 0);
+ EXPECT_TRUE(pkt_mgr_rt);
+
+ printf("module_B_on_packet_stage: %s claim packet failed\n", packet_stage_to_str(stage));
+
+ static int count = 0;
+ EXPECT_TRUE(count == 0);
+ EXPECT_TRUE(stage == PACKET_STAGE_PREROUTING);
+ EXPECT_TRUE(packet_is_ctrl(pkt));
+ EXPECT_TRUE(packet_is_claim(pkt));
+ count++;
+}
+
+TEST(PACKET_MANAGER, CLAIM_PACKET)
+{
+ // global init
+ struct mq_schema *mq_schema = mq_schema_new();
+ EXPECT_TRUE(mq_schema);
+ struct mq_runtime *mq_rt = mq_runtime_new(mq_schema);
+ EXPECT_TRUE(mq_rt);
+
+ // module init
+ struct packet_manager *pkt_mgr = packet_manager_new(mq_schema, "./conf/stellar.toml");
+ EXPECT_TRUE(pkt_mgr);
+ struct packet_manager_schema *schema = packet_manager_get_schema(pkt_mgr);
+ EXPECT_TRUE(schema);
+ EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_PREROUTING, module_A_on_packet_stage, pkt_mgr) == 0);
+ EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_INPUT, module_A_on_packet_stage, pkt_mgr) == 0);
+ EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_FORWARD, module_A_on_packet_stage, pkt_mgr) == 0);
+ EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_OUTPUT, module_A_on_packet_stage, pkt_mgr) == 0);
+ EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_POSTROUTING, module_A_on_packet_stage, pkt_mgr) == 0);
+
+ EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_PREROUTING, module_B_on_packet_stage, pkt_mgr) == 0);
+ EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_INPUT, module_B_on_packet_stage, pkt_mgr) == 0);
+ EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_FORWARD, module_B_on_packet_stage, pkt_mgr) == 0);
+ EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_OUTPUT, module_B_on_packet_stage, pkt_mgr) == 0);
+ EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_POSTROUTING, module_B_on_packet_stage, pkt_mgr) == 0);
+
+ // per-thread init
+ struct packet_manager_runtime *runtime = packet_manager_get_runtime(pkt_mgr, 0);
+ EXPECT_TRUE(runtime);
+ packet_manager_runtime_init(runtime, mq_rt);
+
+ // per-thread run
+ struct packet pkt;
+ memset(&pkt, 0, sizeof(pkt));
+ packet_parse(&pkt, (const char *)data, sizeof(data));
+ packet_set_ctrl(&pkt, true);
+
+ check_stat(packet_manager_runtime_get_stat(runtime), 0, 0, 0, 0);
+ packet_manager_runtime_input(runtime, &pkt);
+ packet_manager_runtime_dispatch(runtime);
+ EXPECT_TRUE(packet_manager_runtime_output(runtime) == NULL);
+ check_stat(packet_manager_runtime_get_stat(runtime), 1, 0, 1, 0);
+
+ // per-thread free
+
+ // module free
+ packet_manager_free(pkt_mgr);
+
+ // global free
+ mq_runtime_free(mq_rt);
+ mq_schema_free(mq_schema);
+}
+#endif
+
+#if 1
+static void module_C_on_packet_stage(enum packet_stage stage, struct packet *pkt, void *args)
+{
+ struct packet_manager *pkt_mgr = (struct packet_manager *)args;
+ struct packet_manager_runtime *pkt_mgr_rt = packet_manager_get_runtime(pkt_mgr, 0);
+ EXPECT_TRUE(pkt_mgr_rt);
+
+ printf("module_C_on_packet_stage: \"%s\" schedule packet %p\n", packet_stage_to_str(stage), pkt);
+
+ EXPECT_TRUE(!packet_is_claim(pkt));
+
+ if (stage == PACKET_STAGE_PREROUTING)
+ {
+ packet_manager_runtime_schedule_packet(pkt_mgr_rt, packet_new(10), PACKET_STAGE_INPUT);
+ packet_manager_runtime_schedule_packet(pkt_mgr_rt, packet_new(10), PACKET_STAGE_FORWARD);
+ packet_manager_runtime_schedule_packet(pkt_mgr_rt, packet_new(10), PACKET_STAGE_OUTPUT);
+ packet_manager_runtime_schedule_packet(pkt_mgr_rt, packet_new(10), PACKET_STAGE_POSTROUTING);
+ }
+}
+
+TEST(PACKET_MANAGER, SCHEDULE_PACKET)
+{
+ // global init
+ struct mq_schema *mq_schema = mq_schema_new();
+ EXPECT_TRUE(mq_schema);
+ struct mq_runtime *mq_rt = mq_runtime_new(mq_schema);
+ EXPECT_TRUE(mq_rt);
+
+ // module init
+ struct packet_manager *pkt_mgr = packet_manager_new(mq_schema, "./conf/stellar.toml");
+ EXPECT_TRUE(pkt_mgr);
+ struct packet_manager_schema *schema = packet_manager_get_schema(pkt_mgr);
+ EXPECT_TRUE(schema);
+ EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_PREROUTING, module_C_on_packet_stage, pkt_mgr) == 0);
+ EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_INPUT, module_C_on_packet_stage, pkt_mgr) == 0);
+ EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_FORWARD, module_C_on_packet_stage, pkt_mgr) == 0);
+ EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_OUTPUT, module_C_on_packet_stage, pkt_mgr) == 0);
+ EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_POSTROUTING, module_C_on_packet_stage, pkt_mgr) == 0);
+
+ // per-thread init
+ struct packet_manager_runtime *runtime = packet_manager_get_runtime(pkt_mgr, 0);
+ EXPECT_TRUE(runtime);
+ packet_manager_runtime_init(runtime, mq_rt);
+
+ // per-thread run
+ struct packet pkt;
+ memset(&pkt, 0, sizeof(pkt));
+ packet_parse(&pkt, (const char *)data, sizeof(data));
+ packet_set_ctrl(&pkt, true);
+
+ check_stat(packet_manager_runtime_get_stat(runtime), 0, 0, 0, 0);
+ packet_manager_runtime_input(runtime, &pkt);
+ packet_manager_runtime_dispatch(runtime);
+
+ struct packet *tmp = NULL;
+ for (int i = 0; i < 4; i++)
+ {
+ tmp = packet_manager_runtime_output(runtime);
+ EXPECT_TRUE(tmp);
+ packet_free(tmp);
+ }
+ EXPECT_TRUE(packet_manager_runtime_output(runtime) == &pkt);
+ EXPECT_TRUE(packet_manager_runtime_output(runtime) == NULL);
+ check_stat(packet_manager_runtime_get_stat(runtime), 1, 5, 0, 4);
+
+ // per-thread free
+
+ // module free
+ packet_manager_free(pkt_mgr);
+
+ // global free
+ mq_runtime_free(mq_rt);
+ mq_schema_free(mq_schema);
}
#endif
#if 1
-TEST(PACKET_MANAGER, SCHEDULE)
+TEST(PACKET_MANAGER, CLAIM_AND_SCHEDULE_PACKET)
{
// TODO
- // packet_manager_runtime_schedule_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt, enum packet_stage stage);
}
#endif