summaryrefslogtreecommitdiff
path: root/infra
diff options
context:
space:
mode:
authorluwenpeng <[email protected]>2024-10-09 11:46:53 +0800
committerluwenpeng <[email protected]>2024-10-09 11:53:10 +0800
commit9e954386fd868d358bb18d2fcffc92dfe601bf5e (patch)
tree7d18aca13f0d3472322d270f4e08a087b816b2ea /infra
parent0f082d975ec067e268d868b0855083caaeff9522 (diff)
Refactored packet IO to use packet_manager_schedule_packet() instead of stellar_send_build_packet() to send user-built packets
Diffstat (limited to 'infra')
-rw-r--r--infra/packet_io/marsio_io.c82
-rw-r--r--infra/packet_io/marsio_io.h1
-rw-r--r--infra/packet_io/packet_io.c8
-rw-r--r--infra/packet_io/packet_io.h1
-rw-r--r--infra/packet_io/pcap_io.c102
-rw-r--r--infra/packet_io/pcap_io.h1
-rw-r--r--infra/stellar_core.c36
-rw-r--r--infra/version.map2
8 files changed, 77 insertions, 156 deletions
diff --git a/infra/packet_io/marsio_io.c b/infra/packet_io/marsio_io.c
index cad5ce1..cf0b6d8 100644
--- a/infra/packet_io/marsio_io.c
+++ b/infra/packet_io/marsio_io.c
@@ -10,7 +10,7 @@
#include "packet_parser.h"
#include "packet_internal.h"
-#define MARSIO_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "marsio", format, ##__VA_ARGS__)
+#define MARSIO_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "marsio io", format, ##__VA_ARGS__)
struct marsio_io
{
@@ -329,7 +329,9 @@ uint16_t marsio_io_ingress(void *handle, uint16_t thr_idx, struct packet *pkts,
void marsio_io_egress(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
{
+ int is_injected_packet = 0;
int len;
+ char *ptr;
struct packet *pkt;
marsio_buff_t *mbuff;
struct marsio_io *mr_io = (struct marsio_io *)handle;
@@ -337,17 +339,29 @@ void marsio_io_egress(void *handle, uint16_t thr_idx, struct packet *pkts, uint1
for (uint16_t i = 0; i < nr_pkts; i++)
{
+ is_injected_packet = 0;
pkt = &pkts[i];
len = packet_get_raw_len(pkt);
- stat->pkts_tx++;
- stat->bytes_tx += len;
-
mbuff = (marsio_buff_t *)packet_get_origin_ctx(pkt);
- assert(mbuff != NULL);
+ if (mbuff == NULL)
+ {
+ if (marsio_buff_malloc_global(mr_io->mr_ins, &mbuff, 1, MARSIO_SOCKET_ID_ANY, MARSIO_LCORE_ID_ANY) < 0)
+ {
+ MARSIO_IO_LOG_ERROR("unable to allocate marsio buffer for inject packet");
+ continue;
+ }
+ ptr = marsio_buff_append(mbuff, len);
+ memcpy(ptr, packet_get_raw_data(pkt), len);
+
+ is_injected_packet = 1;
+ }
metadata_from_packet_to_mbuff(pkt, mbuff);
- if (marsio_buff_is_ctrlbuf(mbuff))
+ stat->pkts_tx++;
+ stat->bytes_tx += len;
+
+ if (packet_is_ctrl(pkt))
{
stat->ctrl_pkts_tx++;
stat->ctrl_bytes_tx += len;
@@ -358,8 +372,16 @@ void marsio_io_egress(void *handle, uint16_t thr_idx, struct packet *pkts, uint1
stat->raw_bytes_tx += len;
}
- marsio_send_burst(mr_io->mr_path, thr_idx, &mbuff, 1);
- packet_free(pkt);
+ if (is_injected_packet)
+ {
+ stat->pkts_injected++;
+ stat->bytes_injected += len;
+ marsio_send_burst_with_options(mr_io->mr_path, thr_idx, &mbuff, 1, MARSIO_SEND_OPT_REHASH);
+ }
+ else
+ {
+ marsio_send_burst(mr_io->mr_path, thr_idx, &mbuff, 1);
+ }
}
}
@@ -380,53 +402,9 @@ void marsio_io_drop(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_
stat->bytes_dropped += packet_get_raw_len(pkt);
marsio_buff_free(mr_io->mr_ins, &mbuff, 1, 0, thr_idx);
}
- packet_free(pkt);
}
}
-uint16_t marsio_io_inject(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
-{
- int len;
- char *ptr;
- uint16_t nr_inject = 0;
- struct packet *pkt;
- marsio_buff_t *mbuff;
- struct marsio_io *mr_io = (struct marsio_io *)handle;
- struct packet_io_stat *stat = &mr_io->stat[thr_idx];
-
- for (uint16_t i = 0; i < nr_pkts; i++)
- {
- pkt = &pkts[i];
- len = packet_get_raw_len(pkt);
-
- if (marsio_buff_malloc_global(mr_io->mr_ins, &mbuff, 1, MARSIO_SOCKET_ID_ANY, MARSIO_LCORE_ID_ANY) < 0)
- {
- MARSIO_IO_LOG_ERROR("unable to allocate marsio buffer for inject packet");
- continue;
- }
-
- stat->pkts_injected++;
- stat->bytes_injected += len;
-
- stat->raw_pkts_tx++;
- stat->raw_bytes_tx += len;
-
- stat->pkts_tx++;
- stat->bytes_tx += len;
-
- nr_inject++;
-
- ptr = marsio_buff_append(mbuff, len);
- memcpy(ptr, packet_get_raw_data(pkt), len);
- metadata_from_packet_to_mbuff(pkt, mbuff);
-
- marsio_send_burst_with_options(mr_io->mr_path, thr_idx, &mbuff, 1, MARSIO_SEND_OPT_REHASH);
- packet_free(pkt);
- }
-
- return nr_inject;
-}
-
void marsio_io_yield(void *handle, uint16_t thr_idx)
{
struct marsio_io *mr_io = (struct marsio_io *)handle;
diff --git a/infra/packet_io/marsio_io.h b/infra/packet_io/marsio_io.h
index afbb530..ab2f37c 100644
--- a/infra/packet_io/marsio_io.h
+++ b/infra/packet_io/marsio_io.h
@@ -15,7 +15,6 @@ int marsio_io_init(void *handle, uint16_t thr_idx);
uint16_t marsio_io_ingress(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
void marsio_io_egress(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
void marsio_io_drop(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
-uint16_t marsio_io_inject(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
void marsio_io_yield(void *handle, uint16_t thr_idx);
struct packet_io_stat *marsio_io_stat(void *handle, uint16_t thr_idx);
diff --git a/infra/packet_io/packet_io.c b/infra/packet_io/packet_io.c
index 45eda02..897f2c9 100644
--- a/infra/packet_io/packet_io.c
+++ b/infra/packet_io/packet_io.c
@@ -23,7 +23,6 @@ struct packet_io
uint16_t (*ingress_func)(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
void (*egress_func)(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
void (*drop_func)(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
- uint16_t (*inject_func)(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
void (*yield_func)(void *handle, uint16_t thr_idx);
struct packet_io_stat *(*stat_func)(void *handle, uint16_t thr_idx);
};
@@ -261,7 +260,6 @@ struct packet_io *packet_io_new(const char *toml_file)
pkt_io->ingress_func = marsio_io_ingress;
pkt_io->egress_func = marsio_io_egress;
pkt_io->drop_func = marsio_io_drop;
- pkt_io->inject_func = marsio_io_inject;
pkt_io->yield_func = marsio_io_yield;
pkt_io->stat_func = marsio_io_stat;
}
@@ -274,7 +272,6 @@ struct packet_io *packet_io_new(const char *toml_file)
pkt_io->ingress_func = pcap_io_ingress;
pkt_io->egress_func = pcap_io_egress;
pkt_io->drop_func = pcap_io_drop;
- pkt_io->inject_func = pcap_io_inject;
pkt_io->yield_func = pcap_io_yield;
pkt_io->stat_func = pcap_io_stat;
}
@@ -331,11 +328,6 @@ void packet_io_drop(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *p
pkt_io->drop_func(pkt_io->handle, thr_idx, pkts, nr_pkts);
}
-uint16_t packet_io_inject(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
-{
- return pkt_io->inject_func(pkt_io->handle, thr_idx, pkts, nr_pkts);
-}
-
void packet_io_yield(struct packet_io *pkt_io, uint16_t thr_idx)
{
pkt_io->yield_func(pkt_io->handle, thr_idx);
diff --git a/infra/packet_io/packet_io.h b/infra/packet_io/packet_io.h
index d5b089b..efb9453 100644
--- a/infra/packet_io/packet_io.h
+++ b/infra/packet_io/packet_io.h
@@ -74,7 +74,6 @@ int packet_io_init(struct packet_io *pkt_io, uint16_t thr_idx);
uint16_t packet_io_ingress(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
void packet_io_egress(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
void packet_io_drop(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
-uint16_t packet_io_inject(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
void packet_io_yield(struct packet_io *pkt_io, uint16_t thr_idx);
struct packet_io_stat *packet_io_stat(struct packet_io *pkt_io, uint16_t thr_idx);
diff --git a/infra/packet_io/pcap_io.c b/infra/packet_io/pcap_io.c
index 137b9da..00220c9 100644
--- a/infra/packet_io/pcap_io.c
+++ b/infra/packet_io/pcap_io.c
@@ -407,10 +407,15 @@ uint16_t pcap_io_ingress(void *handle, uint16_t thr_idx, struct packet *pkts, ui
void pcap_io_egress(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
{
int len;
+ struct tuple6 tuple;
struct packet *pkt = NULL;
struct pcap_io *pcap_io = (struct pcap_io *)handle;
struct packet_io_stat *stat = &pcap_io->stat[thr_idx];
+ char file[PATH_MAX] = {0};
+ char src_addr[INET6_ADDRSTRLEN] = {0};
+ char dst_addr[INET6_ADDRSTRLEN] = {0};
+
for (uint16_t i = 0; i < nr_pkts; i++)
{
pkt = &pkts[i];
@@ -419,15 +424,51 @@ void pcap_io_egress(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_
stat->pkts_tx++;
stat->bytes_tx += len;
- stat->raw_pkts_tx++;
- stat->raw_bytes_tx += len;
+ if (packet_is_ctrl(pkt))
+ {
+ stat->ctrl_pkts_tx++;
+ stat->ctrl_bytes_tx += len;
+ }
+ else
+ {
+ stat->raw_pkts_tx++;
+ stat->raw_bytes_tx += len;
+ }
struct pcap_pkt *pcap_pkt = (struct pcap_pkt *)packet_get_origin_ctx(pkt);
if (pcap_pkt)
{
free(pcap_pkt);
}
- packet_free(pkt);
+ else
+ {
+ stat->pkts_injected++;
+ stat->bytes_injected += len;
+
+ memset(&tuple, 0, sizeof(struct tuple6));
+ packet_get_innermost_tuple6(pkt, &tuple);
+
+ if (tuple.addr_family == AF_INET)
+ {
+ inet_ntop(AF_INET, &tuple.src_addr.v4, src_addr, INET6_ADDRSTRLEN);
+ inet_ntop(AF_INET, &tuple.dst_addr.v4, dst_addr, INET6_ADDRSTRLEN);
+ }
+ else
+ {
+ inet_ntop(AF_INET6, &tuple.src_addr.v6, src_addr, INET6_ADDRSTRLEN);
+ inet_ntop(AF_INET6, &tuple.dst_addr.v6, dst_addr, INET6_ADDRSTRLEN);
+ }
+ snprintf(file, sizeof(file), "inject-%s:%u-%s:%u-%lu.pcap", src_addr, ntohs(tuple.src_port), dst_addr, ntohs(tuple.dst_port), stat->pkts_injected);
+
+ if (packet_dump_pcap(pkt, file) == -1)
+ {
+ PCAP_IO_LOG_ERROR("unable to dump pcap file: %s", file);
+ }
+ else
+ {
+ PCAP_IO_LOG_FATAL("dump inject packet: %s", file);
+ }
+ }
}
}
@@ -451,61 +492,6 @@ void pcap_io_drop(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t
}
}
-uint16_t pcap_io_inject(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
-{
- uint16_t len;
- struct tuple6 tuple;
- struct packet *pkt = NULL;
- struct pcap_io *pcap_io = (struct pcap_io *)handle;
- struct packet_io_stat *stat = &pcap_io->stat[thr_idx];
-
- char file[PATH_MAX] = {0};
- char src_addr[INET6_ADDRSTRLEN] = {0};
- char dst_addr[INET6_ADDRSTRLEN] = {0};
-
- for (uint16_t i = 0; i < nr_pkts; i++)
- {
- pkt = &pkts[i];
- len = packet_get_raw_len(pkt);
-
- stat->pkts_injected++;
- stat->bytes_injected += len;
-
- stat->raw_pkts_tx++;
- stat->raw_bytes_tx += len;
-
- stat->pkts_tx++;
- stat->bytes_tx += len;
-
- memset(&tuple, 0, sizeof(struct tuple6));
- packet_get_innermost_tuple6(pkt, &tuple);
-
- if (tuple.addr_family == AF_INET)
- {
- inet_ntop(AF_INET, &tuple.src_addr.v4, src_addr, INET6_ADDRSTRLEN);
- inet_ntop(AF_INET, &tuple.dst_addr.v4, dst_addr, INET6_ADDRSTRLEN);
- }
- else
- {
- inet_ntop(AF_INET6, &tuple.src_addr.v6, src_addr, INET6_ADDRSTRLEN);
- inet_ntop(AF_INET6, &tuple.dst_addr.v6, dst_addr, INET6_ADDRSTRLEN);
- }
- snprintf(file, sizeof(file), "inject-%s:%u-%s:%u-%lu.pcap", src_addr, ntohs(tuple.src_port), dst_addr, ntohs(tuple.dst_port), stat->pkts_injected);
-
- if (packet_dump_pcap(pkt, file) == -1)
- {
- PCAP_IO_LOG_ERROR("unable to dump pcap file: %s", file);
- }
- else
- {
- PCAP_IO_LOG_FATAL("dump inject packet: %s", file);
- }
- packet_free(pkt);
- }
-
- return nr_pkts;
-}
-
void pcap_io_yield(void *handle __attribute__((unused)), uint16_t thr_idx __attribute__((unused)))
{
return;
diff --git a/infra/packet_io/pcap_io.h b/infra/packet_io/pcap_io.h
index 0bd03a8..6619f46 100644
--- a/infra/packet_io/pcap_io.h
+++ b/infra/packet_io/pcap_io.h
@@ -15,7 +15,6 @@ int pcap_io_init(void *handle, uint16_t thr_idx);
uint16_t pcap_io_ingress(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
void pcap_io_egress(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
void pcap_io_drop(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
-uint16_t pcap_io_inject(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
void pcap_io_yield(void *handle, uint16_t thr_idx);
struct packet_io_stat *pcap_io_stat(void *handle, uint16_t thr_idx);
diff --git a/infra/stellar_core.c b/infra/stellar_core.c
index b3e4eec..0d85a7e 100644
--- a/infra/stellar_core.c
+++ b/infra/stellar_core.c
@@ -100,10 +100,12 @@ static void *worker_thread(void *arg)
if (packet_get_action(pkt) == PACKET_ACTION_DROP)
{
packet_io_drop(pkt_io, thread_id, pkt, 1);
+ packet_free(pkt);
}
else
{
packet_io_egress(pkt_io, thread_id, pkt, 1);
+ packet_free(pkt);
}
stellar_polling_dispatch(polling_mgr);
}
@@ -281,40 +283,6 @@ void stellar_reload_log_level(struct stellar *st)
}
}
-/******************************************************************************
- * Stellar Utility Function
- ******************************************************************************/
-
-// TODO
-#if 0
-// only send user build packet, can't send packet which come from network
-void stellar_send_build_packet(struct stellar *st, struct packet *pkt)
-{
- uint16_t thread_id = stellar_module_manager_get_thread_id(st->st.mod_mgr);
- struct packet_io *pkt_io = st->st.pkt_io;
- struct session_manager_runtime *sess_mgr_rt = st->st.threads[thread_id].sess_mgr_rt;
- session_manager_runtime_record_duplicated_packet(sess_mgr_rt, pkt);
-
- if (packet_is_claim(pkt))
- {
- PACKET_LOG_ERROR("packet has been claimed and cannot be released, please check the module arrangement order");
- assert(0);
- return;
- }
-
- if (packet_get_origin_ctx(pkt))
- {
- // TODO
- abort();
- packet_io_egress(pkt_io, thread_id, pkt, 1);
- }
- else
- {
- packet_io_inject(pkt_io, thread_id, pkt, 1);
- }
-}
-#endif
-
struct logger *stellar_get_logger(struct stellar *st)
{
if (st)
diff --git a/infra/version.map b/infra/version.map
index e9008aa..a6163bc 100644
--- a/infra/version.map
+++ b/infra/version.map
@@ -42,7 +42,7 @@ global:
stellar_session_plugin_dettach_current_session;
stellar_packet_plugin_register;
stellar_polling_plugin_register;
- stellar_send_build_packet;
+
stellar_new;
stellar_run;
stellar_free;