summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorluwenpeng <[email protected]>2023-09-06 18:30:14 +0800
committerluwenpeng <[email protected]>2023-09-07 14:27:24 +0800
commite16b028be675ee35f4d08521accbc52ee6e6b182 (patch)
tree03ea80f57d4f2c2b98dd125fa9baa2cfd770dc88
parent142e30257e2d852b381f1ca47257095b888627cd (diff)
[refactor] Decouple packets from sessions/events
-rw-r--r--src/event/event.rs113
-rw-r--r--src/event/manager.rs13
-rw-r--r--src/lib.rs3
-rw-r--r--src/main.rs102
-rw-r--r--src/packet/error.rs7
-rw-r--r--src/packet/packet.rs339
-rw-r--r--src/plugin/example.rs23
-rw-r--r--src/session/manager.rs103
-rw-r--r--src/session/session.rs58
-rw-r--r--src/utils/mod.rs1
-rw-r--r--src/utils/utils.rs27
11 files changed, 354 insertions, 435 deletions
diff --git a/src/event/event.rs b/src/event/event.rs
index 8ab3918..e2e2842 100644
--- a/src/event/event.rs
+++ b/src/event/event.rs
@@ -4,137 +4,156 @@ use std::cell::RefCell;
use std::rc::Rc;
// L2 Event
-pub const BUILDIN_L2_EVENT: &str = "BUILDIN_L2_EVENT";
+pub const BUILTIN_L2_EVENT: &str = "BUILTIN_L2_EVENT";
// L3 Event
-pub const BUILDIN_L3_EVENT: &str = "BUILDIN_L3_EVENT";
-pub const BUILDIN_IP4_EVENT: &str = "BUILDIN_IP4_EVENT";
-pub const BUILDIN_IP6_EVENT: &str = "BUILDIN_IP6_EVENT";
+pub const BUILTIN_L3_EVENT: &str = "BUILTIN_L3_EVENT";
+pub const BUILTIN_IPV4_EVENT: &str = "BUILTIN_IPV4_EVENT";
+pub const BUILTIN_IPV6_EVENT: &str = "BUILTIN_IPV6_EVENT";
// L4 Event
-pub const BUILDIN_L4_EVENT: &str = "BUILDIN_L4_EVENT";
-pub const BUILDIN_TCP_OPENING_EVENT: &str = "BUILDIN_TCP_OPENING_EVENT";
-pub const BUILDIN_TCP_ACTIVE_EVENT: &str = "BUILDIN_TCP_ACTIVE_EVENT";
-pub const BUILDIN_TCP_EXPIRE_EVENT: &str = "BUILDIN_TCP_EXPIRE_EVENT";
-pub const BUILDIN_TCP_CLOSED_EVENT: &str = "BUILDIN_TCP_CLOSED_EVENT";
+pub const BUILTIN_L4_EVENT: &str = "BUILTIN_L4_EVENT";
-pub const BUILDIN_UDP_OPENING_EVENT: &str = "BUILDIN_UDP_OPENING_EVENT";
-pub const BUILDIN_UDP_ACTIVE_EVENT: &str = "BUILDIN_UDP_ACTIVE_EVENT";
-pub const BUILDIN_UDP_EXPIRE_EVENT: &str = "BUILDIN_UDP_EXPIRE_EVENT";
+pub const BUILTIN_TCP_EVENT: &str = "BUILTIN_TCP_EVENT";
+pub const BUILTIN_TCP_OPENING_EVENT: &str = "BUILTIN_TCP_OPENING_EVENT";
+pub const BUILTIN_TCP_ACTIVE_EVENT: &str = "BUILTIN_TCP_ACTIVE_EVENT";
+pub const BUILTIN_TCP_EXPIRE_EVENT: &str = "BUILTIN_TCP_EXPIRE_EVENT";
+pub const BUILTIN_TCP_CLOSED_EVENT: &str = "BUILTIN_TCP_CLOSED_EVENT";
+
+pub const BUILTIN_UDP_EVENT: &str = "BUILTIN_UDP_EVENT";
+pub const BUILTIN_UDP_OPENING_EVENT: &str = "BUILTIN_UDP_OPENING_EVENT";
+pub const BUILTIN_UDP_ACTIVE_EVENT: &str = "BUILTIN_UDP_ACTIVE_EVENT";
+pub const BUILTIN_UDP_EXPIRE_EVENT: &str = "BUILTIN_UDP_EXPIRE_EVENT";
// L7 Event
-pub const BUILDIN_L7_EVENT: &str = "BUILDIN_L7_EVENT";
-pub const BUILDIN_DNS_EVENT: &str = "BUILDIN_DNS_EVENT";
-pub const BUILDIN_HTTP_EVENT: &str = "BUILDIN_HTTP_EVENT";
+pub const BUILTIN_L7_EVENT: &str = "BUILTIN_L7_EVENT";
+pub const BUILTIN_DNS_EVENT: &str = "BUILTIN_DNS_EVENT";
+pub const BUILTIN_HTTP_EVENT: &str = "BUILTIN_HTTP_EVENT";
-pub struct BuildInEvent {}
+pub struct BuiltInEvent {}
-impl BuildInEvent {
+impl BuiltInEvent {
pub fn trigger_l2_event(mgr: Rc<RefCell<EventManager>>, session: Option<Rc<RefCell<Session>>>) {
- let buildin_l2_event = mgr.borrow_mut().event2index(BUILDIN_L2_EVENT);
- mgr.borrow_mut().trigger(buildin_l2_event, session);
+ let builtin_l2_event = mgr.borrow_mut().event2index(BUILTIN_L2_EVENT);
+ mgr.borrow_mut().trigger(builtin_l2_event, session);
}
pub fn trigger_l3_event(mgr: Rc<RefCell<EventManager>>, session: Option<Rc<RefCell<Session>>>) {
- let buildin_l3_event = mgr.borrow_mut().event2index(BUILDIN_L3_EVENT);
- mgr.borrow_mut().trigger(buildin_l3_event, session);
+ let builtin_l3_event = mgr.borrow_mut().event2index(BUILTIN_L3_EVENT);
+ mgr.borrow_mut().trigger(builtin_l3_event, session);
}
pub fn trigger_ip4_event(
mgr: Rc<RefCell<EventManager>>,
session: Option<Rc<RefCell<Session>>>,
) {
- let buildin_ip4_event = mgr.borrow_mut().event2index(BUILDIN_IP4_EVENT);
- mgr.borrow_mut().trigger(buildin_ip4_event, session);
+ let builtin_ip4_event = mgr.borrow_mut().event2index(BUILTIN_IPV4_EVENT);
+ mgr.borrow_mut().trigger(builtin_ip4_event, session);
}
pub fn trigger_ip6_event(
mgr: Rc<RefCell<EventManager>>,
session: Option<Rc<RefCell<Session>>>,
) {
- let buildin_ip6_event = mgr.borrow_mut().event2index(BUILDIN_IP6_EVENT);
- mgr.borrow_mut().trigger(buildin_ip6_event, session);
+ let builtin_ip6_event = mgr.borrow_mut().event2index(BUILTIN_IPV6_EVENT);
+ mgr.borrow_mut().trigger(builtin_ip6_event, session);
}
pub fn trigger_l4_event(mgr: Rc<RefCell<EventManager>>, session: Option<Rc<RefCell<Session>>>) {
- let buildin_l4_event = mgr.borrow_mut().event2index(BUILDIN_L4_EVENT);
- mgr.borrow_mut().trigger(buildin_l4_event, session);
+ let builtin_l4_event = mgr.borrow_mut().event2index(BUILTIN_L4_EVENT);
+ mgr.borrow_mut().trigger(builtin_l4_event, session);
+ }
+
+ pub fn trigger_tcp_event(
+ mgr: Rc<RefCell<EventManager>>,
+ session: Option<Rc<RefCell<Session>>>,
+ ) {
+ let builtin_tcp_event = mgr.borrow_mut().event2index(BUILTIN_TCP_EVENT);
+ mgr.borrow_mut().trigger(builtin_tcp_event, session);
}
pub fn trigger_tcp_opening_event(
mgr: Rc<RefCell<EventManager>>,
session: Option<Rc<RefCell<Session>>>,
) {
- let buildin_tcp_opening_event = mgr.borrow_mut().event2index(BUILDIN_TCP_OPENING_EVENT);
- mgr.borrow_mut().trigger(buildin_tcp_opening_event, session);
+ let builtin_tcp_opening_event = mgr.borrow_mut().event2index(BUILTIN_TCP_OPENING_EVENT);
+ mgr.borrow_mut().trigger(builtin_tcp_opening_event, session);
}
pub fn trigger_tcp_active_event(
mgr: Rc<RefCell<EventManager>>,
session: Option<Rc<RefCell<Session>>>,
) {
- let buildin_tcp_active_event = mgr.borrow_mut().event2index(BUILDIN_TCP_ACTIVE_EVENT);
- mgr.borrow_mut().trigger(buildin_tcp_active_event, session);
+ let builtin_tcp_active_event = mgr.borrow_mut().event2index(BUILTIN_TCP_ACTIVE_EVENT);
+ mgr.borrow_mut().trigger(builtin_tcp_active_event, session);
}
pub fn trigger_tcp_expire_event(
mgr: Rc<RefCell<EventManager>>,
session: Option<Rc<RefCell<Session>>>,
) {
- let buildin_tcp_expire_event = mgr.borrow_mut().event2index(BUILDIN_TCP_EXPIRE_EVENT);
- mgr.borrow_mut().trigger(buildin_tcp_expire_event, session);
+ let builtin_tcp_expire_event = mgr.borrow_mut().event2index(BUILTIN_TCP_EXPIRE_EVENT);
+ mgr.borrow_mut().trigger(builtin_tcp_expire_event, session);
}
pub fn trigger_tcp_closed_event(
mgr: Rc<RefCell<EventManager>>,
session: Option<Rc<RefCell<Session>>>,
) {
- let buildin_tcp_closed_event = mgr.borrow_mut().event2index(BUILDIN_TCP_CLOSED_EVENT);
- mgr.borrow_mut().trigger(buildin_tcp_closed_event, session);
+ let builtin_tcp_closed_event = mgr.borrow_mut().event2index(BUILTIN_TCP_CLOSED_EVENT);
+ mgr.borrow_mut().trigger(builtin_tcp_closed_event, session);
+ }
+
+ pub fn trigger_udp_event(
+ mgr: Rc<RefCell<EventManager>>,
+ session: Option<Rc<RefCell<Session>>>,
+ ) {
+ let builtin_udp_event = mgr.borrow_mut().event2index(BUILTIN_UDP_EVENT);
+ mgr.borrow_mut().trigger(builtin_udp_event, session);
}
pub fn trigger_udp_opening_event(
mgr: Rc<RefCell<EventManager>>,
session: Option<Rc<RefCell<Session>>>,
) {
- let buildin_udp_opening_event = mgr.borrow_mut().event2index(BUILDIN_UDP_OPENING_EVENT);
- mgr.borrow_mut().trigger(buildin_udp_opening_event, session);
+ let builtin_udp_opening_event = mgr.borrow_mut().event2index(BUILTIN_UDP_OPENING_EVENT);
+ mgr.borrow_mut().trigger(builtin_udp_opening_event, session);
}
pub fn trigger_udp_active_event(
mgr: Rc<RefCell<EventManager>>,
session: Option<Rc<RefCell<Session>>>,
) {
- let buildin_udp_active_event = mgr.borrow_mut().event2index(BUILDIN_UDP_ACTIVE_EVENT);
- mgr.borrow_mut().trigger(buildin_udp_active_event, session);
+ let builtin_udp_active_event = mgr.borrow_mut().event2index(BUILTIN_UDP_ACTIVE_EVENT);
+ mgr.borrow_mut().trigger(builtin_udp_active_event, session);
}
pub fn trigger_udp_expire_event(
mgr: Rc<RefCell<EventManager>>,
session: Option<Rc<RefCell<Session>>>,
) {
- let buildin_udp_expire_event = mgr.borrow_mut().event2index(BUILDIN_UDP_EXPIRE_EVENT);
- mgr.borrow_mut().trigger(buildin_udp_expire_event, session);
+ let builtin_udp_expire_event = mgr.borrow_mut().event2index(BUILTIN_UDP_EXPIRE_EVENT);
+ mgr.borrow_mut().trigger(builtin_udp_expire_event, session);
}
pub fn trigger_l7_event(mgr: Rc<RefCell<EventManager>>, session: Option<Rc<RefCell<Session>>>) {
- let buildin_l7_event = mgr.borrow_mut().event2index(BUILDIN_L7_EVENT);
- mgr.borrow_mut().trigger(buildin_l7_event, session);
+ let builtin_l7_event = mgr.borrow_mut().event2index(BUILTIN_L7_EVENT);
+ mgr.borrow_mut().trigger(builtin_l7_event, session);
}
pub fn trigger_dns_event(
mgr: Rc<RefCell<EventManager>>,
session: Option<Rc<RefCell<Session>>>,
) {
- let buildin_dns_event = mgr.borrow_mut().event2index(BUILDIN_DNS_EVENT);
- mgr.borrow_mut().trigger(buildin_dns_event, session);
+ let builtin_dns_event = mgr.borrow_mut().event2index(BUILTIN_DNS_EVENT);
+ mgr.borrow_mut().trigger(builtin_dns_event, session);
}
pub fn trigger_http_event(
mgr: Rc<RefCell<EventManager>>,
session: Option<Rc<RefCell<Session>>>,
) {
- let buildin_http_event = mgr.borrow_mut().event2index(BUILDIN_HTTP_EVENT);
- mgr.borrow_mut().trigger(buildin_http_event, session);
+ let builtin_http_event = mgr.borrow_mut().event2index(BUILTIN_HTTP_EVENT);
+ mgr.borrow_mut().trigger(builtin_http_event, session);
}
}
diff --git a/src/event/manager.rs b/src/event/manager.rs
index 64975d6..867a77f 100644
--- a/src/event/manager.rs
+++ b/src/event/manager.rs
@@ -7,7 +7,12 @@ use std::rc::Rc;
pub trait EventHandle {
fn init(&mut self);
- fn handle(&mut self, index: usize, packet: &Packet, session: Option<Rc<RefCell<Session>>>);
+ fn handle(
+ &mut self,
+ index: usize,
+ packet: Option<&Packet>,
+ session: Option<Rc<RefCell<Session>>>,
+ );
}
pub struct EventManager {
@@ -61,7 +66,7 @@ impl EventManager {
self.ready_event.push_back((index, session));
}
- pub fn dispatch(&mut self, packet: &Packet) {
+ pub fn dispatch(&mut self, packet: Option<&Packet>) {
loop {
if let Some(event) = self.ready_event.pop_front() {
println!("Dispatch event: {:?}", self.index2event.get(&event.0));
@@ -222,7 +227,7 @@ mod tests {
// Handle packet
let mut packet = Packet::new(&bytes, bytes.len() as u32);
- let result = packet.handle(thread_ctx.clone());
+ let result = packet.handle();
match result {
Ok(_v) => {
// println!("SUCCESS: {:?}, {:?}", packet, _v);
@@ -242,7 +247,7 @@ mod tests {
.borrow_mut()
.get_event_mgr()
.borrow_mut()
- .dispatch(&packet);
+ .dispatch(Some(&packet));
// assert_eq!(1, 0);
}
diff --git a/src/lib.rs b/src/lib.rs
index 045c280..7cde74b 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -3,4 +3,5 @@ pub mod protocol;
pub mod event;
pub mod session;
pub mod plugin;
-pub mod thread; \ No newline at end of file
+pub mod thread;
+pub mod utils; \ No newline at end of file
diff --git a/src/main.rs b/src/main.rs
index a238862..c1286bd 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,18 +1,88 @@
use std::cell::RefCell;
use std::rc::Rc;
+use stellar_rs::event::event::BuiltInEvent;
use stellar_rs::event::manager::EventHandle;
+use stellar_rs::event::manager::EventManager;
use stellar_rs::packet::capture::PacketCapture;
use stellar_rs::packet::packet::Packet;
+use stellar_rs::packet::packet::PacketEvent;
use stellar_rs::plugin::example::ExamplePulgin;
+use stellar_rs::session::session::Session;
+use stellar_rs::session::session::SessionProto;
+use stellar_rs::session::session::SessionState;
use stellar_rs::thread::thread::ThreadContex;
-fn packet_callback(data: &[u8], len: u32, ctx: Rc<RefCell<ThreadContex>>) {
+fn trigger_packet_event(
+ packet: &Packet,
+ session: Option<Rc<RefCell<Session>>>,
+ event_mgr: Rc<RefCell<EventManager>>,
+) {
+ for packet_event in &packet.event {
+ match packet_event {
+ PacketEvent::L2_EVENT => {
+ BuiltInEvent::trigger_l2_event(event_mgr.clone(), session.clone());
+ }
+ PacketEvent::L3_EVENT => {
+ BuiltInEvent::trigger_l3_event(event_mgr.clone(), session.clone());
+ }
+ PacketEvent::IPV4_EVENT => {
+ BuiltInEvent::trigger_ip4_event(event_mgr.clone(), session.clone());
+ }
+ PacketEvent::IPV6_EVENT => {
+ BuiltInEvent::trigger_ip6_event(event_mgr.clone(), session.clone());
+ }
+ PacketEvent::L4_EVENT => {
+ BuiltInEvent::trigger_l4_event(event_mgr.clone(), session.clone());
+ }
+ PacketEvent::TCP_EVENT => {
+ BuiltInEvent::trigger_tcp_event(event_mgr.clone(), session.clone());
+ }
+ PacketEvent::UDP_EVENT => {
+ BuiltInEvent::trigger_udp_event(event_mgr.clone(), session.clone());
+ }
+ }
+ }
+}
+
+fn trigger_session_event(
+ session: Option<Rc<RefCell<Session>>>,
+ event_mgr: Rc<RefCell<EventManager>>,
+) {
+ if session.is_none() {
+ return;
+ }
+
+ let session_state = session.clone().unwrap().borrow_mut().get_session_state();
+ let session_proto = session.clone().unwrap().borrow_mut().get_session_proto();
+
+ match session_state {
+ SessionState::New => match session_proto {
+ SessionProto::TCP => BuiltInEvent::trigger_tcp_opening_event(event_mgr, session),
+ SessionProto::UDP => BuiltInEvent::trigger_udp_opening_event(event_mgr, session),
+ },
+ SessionState::Active => match session_proto {
+ SessionProto::TCP => BuiltInEvent::trigger_tcp_active_event(event_mgr, session),
+ SessionProto::UDP => BuiltInEvent::trigger_udp_active_event(event_mgr, session),
+ },
+ SessionState::Inactive => match session_proto {
+ SessionProto::TCP => BuiltInEvent::trigger_tcp_closed_event(event_mgr, session),
+ SessionProto::UDP => BuiltInEvent::trigger_udp_expire_event(event_mgr, session),
+ },
+ SessionState::Expired => match session_proto {
+ SessionProto::TCP => BuiltInEvent::trigger_tcp_expire_event(event_mgr, session),
+ SessionProto::UDP => BuiltInEvent::trigger_udp_expire_event(event_mgr, session),
+ },
+ }
+}
+
+fn capture_callback(data: &[u8], len: u32, ctx: Rc<RefCell<ThreadContex>>) {
let event_mgr = ctx.borrow_mut().get_event_mgr();
let session_mgr = ctx.borrow_mut().get_session_mgr();
- let empty = Packet::new(b"", 0);
let mut packet = Packet::new(data, len);
- let result = packet.handle(ctx);
+
+ // Handle Packet
+ let result = packet.handle();
match result {
Ok(_left) => {
// println!("SUCCESS: {:?}, {:?}", packet, left);
@@ -23,14 +93,28 @@ fn packet_callback(data: &[u8], len: u32, ctx: Rc<RefCell<ThreadContex>>) {
// println!("ERROR Data: {:?}", packet);
// println!("ERROR Code: {:?}", e);
println!("ERROR Desc: {}", e);
+ return;
}
}
- // Hanlde Packet Event
- event_mgr.borrow_mut().dispatch(&packet);
- // Hanlde Expire Event
- session_mgr.borrow_mut().expire_sessions();
- event_mgr.borrow_mut().dispatch(&empty);
+ // Packets have sessions, triggering both packet events and session events
+ if packet.get_inner_tuple().is_some() {
+ let flow_id = packet.get_flow_id().unwrap();
+ let session = session_mgr.borrow_mut().update(flow_id);
+ trigger_packet_event(&packet, Some(session.clone()), event_mgr.clone());
+ trigger_session_event(Some(session.clone()), event_mgr.clone());
+ // Packets have no sessions, only packet events are triggered
+ } else {
+ trigger_packet_event(&packet, None, event_mgr.clone());
+ }
+
+ // Handle packet events and session events on the current package
+ event_mgr.borrow_mut().dispatch(Some(&packet));
+
+ // Handle expire events without packets
+ let session = session_mgr.borrow_mut().expire_oldest_session();
+ trigger_session_event(session, event_mgr.clone());
+ event_mgr.borrow_mut().dispatch(None);
}
fn main() {
@@ -40,5 +124,5 @@ fn main() {
PacketCapture::show_devices();
let mut cap = PacketCapture::new("en0");
- cap.poll_packet(packet_callback, thread_ctx);
+ cap.poll_packet(capture_callback, thread_ctx);
}
diff --git a/src/packet/error.rs b/src/packet/error.rs
index fe0620e..174930c 100644
--- a/src/packet/error.rs
+++ b/src/packet/error.rs
@@ -18,10 +18,6 @@ pub enum PacketError {
// L4
IncompleteUdpHeader,
IncompleteTcpHeader,
-
- // L7
- IncompleteAppHeader,
- UnsupportAppProtocol,
}
impl core::fmt::Display for PacketError {
@@ -40,9 +36,6 @@ impl core::fmt::Display for PacketError {
// L4
PacketError::IncompleteUdpHeader => write!(f, "Incomplete UDP Header"),
PacketError::IncompleteTcpHeader => write!(f, "Incomplete TCP Header"),
- // L7
- PacketError::IncompleteAppHeader => write!(f, "Incomplete App Header"),
- PacketError::UnsupportAppProtocol => write!(f, "Unsupport App Protocol"),
}
}
}
diff --git a/src/packet/packet.rs b/src/packet/packet.rs
index f80f4d6..2336946 100644
--- a/src/packet/packet.rs
+++ b/src/packet/packet.rs
@@ -1,38 +1,37 @@
-use crate::event::event::BuildInEvent;
use crate::packet::error::PacketError;
use crate::protocol::codec::Decode;
-use crate::protocol::dns::DNS_MESSAGE;
use crate::protocol::ethernet::EtherType;
use crate::protocol::ethernet::EthernetFrame;
-use crate::protocol::http::HTTP_MESSAGE;
use crate::protocol::ip::IPProtocol;
use crate::protocol::ipv4::IPv4Header;
use crate::protocol::ipv6::IPv6Header;
use crate::protocol::tcp::TcpHeader;
use crate::protocol::udp::UdpHeader;
-use crate::session::session::packet2session;
-use crate::session::session::Session;
-use crate::session::session::SessionProto;
-use crate::session::session::SessionState;
-use crate::thread::thread::ThreadContex;
-use std::cell::RefCell;
-use std::rc::Rc;
#[allow(non_camel_case_types)]
#[derive(Clone, Debug, PartialEq)]
pub enum Encapsulation<'a> {
L2_ETH(EthernetFrame, &'a [u8]),
- L3_IP4(IPv4Header, &'a [u8]),
- L3_IP6(IPv6Header, &'a [u8]),
+ L3_IPV4(IPv4Header, &'a [u8]),
+ L3_IPV6(IPv6Header, &'a [u8]),
L4_TCP(TcpHeader, &'a [u8]),
L4_UDP(UdpHeader, &'a [u8]),
- L7_DNS(DNS_MESSAGE, &'a [u8]),
- L7_HTTP(HTTP_MESSAGE, &'a [u8]),
+ UNSUPPORTED(&'a [u8]),
+}
- Unsupported(&'a [u8]),
+#[allow(non_camel_case_types)]
+#[derive(Clone, Debug, PartialEq)]
+pub enum PacketEvent {
+ L2_EVENT,
+ L3_EVENT,
+ IPV4_EVENT,
+ IPV6_EVENT,
+ L4_EVENT,
+ TCP_EVENT,
+ UDP_EVENT,
}
#[derive(Debug)]
@@ -40,6 +39,7 @@ pub struct Packet<'a> {
pub orig_data: &'a [u8],
pub orig_len: u32,
pub encapsulation: Vec<Encapsulation<'a>>,
+ pub event: Vec<PacketEvent>,
}
impl Packet<'_> {
@@ -48,24 +48,25 @@ impl Packet<'_> {
orig_data: data,
orig_len: len,
encapsulation: vec![],
+ event: vec![],
}
}
- pub fn handle(&mut self, ctx: Rc<RefCell<ThreadContex>>) -> Result<(), PacketError> {
+ pub fn handle(&mut self) -> Result<(), PacketError> {
if self.orig_data.len() != self.orig_len as usize {
return Err(PacketError::InvalidPacketLength);
}
- return handle_l2(self, self.orig_data, ctx);
+ return handle_l2(self, self.orig_data);
}
pub fn get_outer_l3_layer(&self) -> Option<Encapsulation> {
let num = self.encapsulation.len();
for i in 0..num {
match self.encapsulation[i] {
- Encapsulation::L3_IP4(_, _) => {
+ Encapsulation::L3_IPV4(_, _) => {
return Some(self.encapsulation[i].clone());
}
- Encapsulation::L3_IP6(_, _) => {
+ Encapsulation::L3_IPV6(_, _) => {
return Some(self.encapsulation[i].clone());
}
_ => continue,
@@ -79,10 +80,10 @@ impl Packet<'_> {
let num = self.encapsulation.len();
for i in (0..num).rev() {
match self.encapsulation[i] {
- Encapsulation::L3_IP4(_, _) => {
+ Encapsulation::L3_IPV4(_, _) => {
return Some(self.encapsulation[i].clone());
}
- Encapsulation::L3_IP6(_, _) => {
+ Encapsulation::L3_IPV6(_, _) => {
return Some(self.encapsulation[i].clone());
}
_ => continue,
@@ -130,13 +131,13 @@ impl Packet<'_> {
let num = self.encapsulation.len();
for i in 0..num {
match self.encapsulation[i] {
- Encapsulation::L3_IP4(ref header, _) => {
+ Encapsulation::L3_IPV4(ref header, _) => {
return Some((
header.source_address.to_string(),
header.dest_address.to_string(),
));
}
- Encapsulation::L3_IP6(ref header, _) => {
+ Encapsulation::L3_IPV6(ref header, _) => {
return Some((
header.source_address.to_string(),
header.dest_address.to_string(),
@@ -153,13 +154,13 @@ impl Packet<'_> {
let num = self.encapsulation.len();
for i in (0..num).rev() {
match self.encapsulation[i] {
- Encapsulation::L3_IP4(ref header, _) => {
+ Encapsulation::L3_IPV4(ref header, _) => {
return Some((
header.source_address.to_string(),
header.dest_address.to_string(),
));
}
- Encapsulation::L3_IP6(ref header, _) => {
+ Encapsulation::L3_IPV6(ref header, _) => {
return Some((
header.source_address.to_string(),
header.dest_address.to_string(),
@@ -213,7 +214,7 @@ impl Packet<'_> {
}
for i in 0..num - 1 {
match self.encapsulation[i] {
- Encapsulation::L3_IP4(ref l3_header, _) => match self.encapsulation[i + 1] {
+ Encapsulation::L3_IPV4(ref l3_header, _) => match self.encapsulation[i + 1] {
Encapsulation::L4_TCP(ref l4_header, _) => {
return Some((
l3_header.source_address.to_string(),
@@ -232,7 +233,7 @@ impl Packet<'_> {
}
_ => continue,
},
- Encapsulation::L3_IP6(ref l3_header, _) => match self.encapsulation[i + 1] {
+ Encapsulation::L3_IPV6(ref l3_header, _) => match self.encapsulation[i + 1] {
Encapsulation::L4_TCP(ref l4_header, _) => {
return Some((
l3_header.source_address.to_string(),
@@ -266,7 +267,7 @@ impl Packet<'_> {
for i in (1..num).rev() {
match self.encapsulation[i] {
Encapsulation::L4_TCP(ref l4_header, _) => match self.encapsulation[i - 1] {
- Encapsulation::L3_IP4(ref l3_header, _) => {
+ Encapsulation::L3_IPV4(ref l3_header, _) => {
return Some((
l3_header.source_address.to_string(),
l4_header.source_port,
@@ -274,7 +275,7 @@ impl Packet<'_> {
l4_header.dest_port,
));
}
- Encapsulation::L3_IP6(ref l3_header, _) => {
+ Encapsulation::L3_IPV6(ref l3_header, _) => {
return Some((
l3_header.source_address.to_string(),
l4_header.source_port,
@@ -285,7 +286,7 @@ impl Packet<'_> {
_ => continue,
},
Encapsulation::L4_UDP(ref l4_header, _) => match self.encapsulation[i - 1] {
- Encapsulation::L3_IP4(ref l3_header, _) => {
+ Encapsulation::L3_IPV4(ref l3_header, _) => {
return Some((
l3_header.source_address.to_string(),
l4_header.source_port,
@@ -293,7 +294,7 @@ impl Packet<'_> {
l4_header.dest_port,
));
}
- Encapsulation::L3_IP6(ref l3_header, _) => {
+ Encapsulation::L3_IPV6(ref l3_header, _) => {
return Some((
l3_header.source_address.to_string(),
l4_header.source_port,
@@ -310,116 +311,55 @@ impl Packet<'_> {
return None;
}
- pub fn get_trace_id(&self) -> Option<String> {
- let num = self.encapsulation.len();
- let mut trace_id = String::new();
- for i in 0..num {
- match self.encapsulation[i] {
- Encapsulation::L3_IP4(ref l3_header, _) => {
- trace_id.push_str("IP4->IP4;");
- trace_id.push_str(&l3_header.source_address.to_string());
- trace_id.push_str("->");
- trace_id.push_str(&l3_header.dest_address.to_string());
- trace_id.push_str(";");
- }
- Encapsulation::L3_IP6(ref l3_header, _) => {
- trace_id.push_str("IP6->IP6;");
- trace_id.push_str(&l3_header.source_address.to_string());
- trace_id.push_str("->");
- trace_id.push_str(&l3_header.dest_address.to_string());
- trace_id.push_str(";");
- }
- Encapsulation::L4_TCP(ref l4_header, _) => {
- trace_id.push_str("TCP->TCP;");
- trace_id.push_str(&l4_header.source_port.to_string());
- trace_id.push_str("->");
- trace_id.push_str(&l4_header.dest_port.to_string());
- trace_id.push_str(";");
- }
- Encapsulation::L4_UDP(ref l4_header, _) => {
- trace_id.push_str("UDP->UDP;");
- trace_id.push_str(&l4_header.source_port.to_string());
- trace_id.push_str("->");
- trace_id.push_str(&l4_header.dest_port.to_string());
- trace_id.push_str(";");
- }
- _ => continue,
- }
- }
-
- Some(trace_id)
- }
-
- pub fn get_reversed_trace_id(&self) -> Option<String> {
+ pub fn get_flow_id(&self) -> Option<String> {
let num = self.encapsulation.len();
- let mut trace_id = String::new();
+ let mut flow_id = String::new();
for i in 0..num {
match self.encapsulation[i] {
- Encapsulation::L3_IP4(ref l3_header, _) => {
- trace_id.push_str("IP4->IP4;");
- trace_id.push_str(&l3_header.dest_address.to_string());
- trace_id.push_str("->");
- trace_id.push_str(&l3_header.source_address.to_string());
- trace_id.push_str(";");
+ Encapsulation::L3_IPV4(ref l3_header, _) => {
+ flow_id.push_str(&l3_header.source_address.to_string());
+ flow_id.push_str("->");
+ flow_id.push_str(&l3_header.dest_address.to_string());
+ flow_id.push_str(";");
}
- Encapsulation::L3_IP6(ref l3_header, _) => {
- trace_id.push_str("IP6->IP6;");
- trace_id.push_str(&l3_header.dest_address.to_string());
- trace_id.push_str("->");
- trace_id.push_str(&l3_header.source_address.to_string());
- trace_id.push_str(";");
+ Encapsulation::L3_IPV6(ref l3_header, _) => {
+ flow_id.push_str(&l3_header.source_address.to_string());
+ flow_id.push_str("->");
+ flow_id.push_str(&l3_header.dest_address.to_string());
+ flow_id.push_str(";");
}
Encapsulation::L4_TCP(ref l4_header, _) => {
- trace_id.push_str("TCP->TCP;");
- trace_id.push_str(&l4_header.dest_port.to_string());
- trace_id.push_str("->");
- trace_id.push_str(&l4_header.source_port.to_string());
- trace_id.push_str(";");
+ flow_id.push_str("TCP->TCP;");
+ flow_id.push_str(&l4_header.source_port.to_string());
+ flow_id.push_str("->");
+ flow_id.push_str(&l4_header.dest_port.to_string());
+ flow_id.push_str(";");
}
Encapsulation::L4_UDP(ref l4_header, _) => {
- trace_id.push_str("UDP->UDP;");
- trace_id.push_str(&l4_header.dest_port.to_string());
- trace_id.push_str("->");
- trace_id.push_str(&l4_header.source_port.to_string());
- trace_id.push_str(";");
+ flow_id.push_str("UDP->UDP;");
+ flow_id.push_str(&l4_header.source_port.to_string());
+ flow_id.push_str("->");
+ flow_id.push_str(&l4_header.dest_port.to_string());
+ flow_id.push_str(";");
}
_ => continue,
}
}
- Some(trace_id)
+ Some(flow_id)
}
}
-pub fn reverse_trace_id(trace_id: &String) -> String {
- let mut reversed_trace_id = String::new();
- let mut trace_id_vec: Vec<&str> = trace_id.split(";").collect();
- trace_id_vec.pop();
- for item in trace_id_vec.iter() {
- let mut item_vec: Vec<&str> = item.split("->").collect();
- item_vec.reverse();
- reversed_trace_id.push_str(&item_vec.join("->"));
- reversed_trace_id.push_str(";");
- }
- reversed_trace_id
-}
-
-fn handle_l2<'a>(
- packet: &mut Packet<'a>,
- input: &'a [u8],
- ctx: Rc<RefCell<ThreadContex>>,
-) -> Result<(), PacketError> {
- let event_mgr = ctx.borrow().get_event_mgr();
+fn handle_l2<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketError> {
+ packet.event.push(PacketEvent::L2_EVENT);
let result = EthernetFrame::decode(input);
if let Ok((payload, header)) = result {
dbg!(&header);
packet
.encapsulation
.push(Encapsulation::L2_ETH(header, payload));
- BuildInEvent::trigger_l2_event(event_mgr, None);
- return handle_l3(packet, payload, header.ether_type, ctx);
+ return handle_l3(packet, payload, header.ether_type);
} else {
- BuildInEvent::trigger_l2_event(event_mgr, None);
return Err(PacketError::IncompleteEthernetFrame);
}
}
@@ -428,9 +368,8 @@ fn handle_l3<'a>(
packet: &mut Packet<'a>,
input: &'a [u8],
next_proto: EtherType,
- ctx: Rc<RefCell<ThreadContex>>,
) -> Result<(), PacketError> {
- let event_mgr = ctx.borrow().get_event_mgr();
+ packet.event.push(PacketEvent::L3_EVENT);
match next_proto {
EtherType::IPv4 => {
let result = IPv4Header::decode(input);
@@ -443,15 +382,13 @@ fn handle_l3<'a>(
packet
.encapsulation
- .push(Encapsulation::L3_IP4(header, payload));
+ .push(Encapsulation::L3_IPV4(header, payload));
// TODO IPv4 Fragment
- BuildInEvent::trigger_l3_event(event_mgr.clone(), None);
- BuildInEvent::trigger_ip4_event(event_mgr, None);
- return handle_l4(packet, payload, header.protocol, ctx);
+ packet.event.push(PacketEvent::IPV4_EVENT);
+ return handle_l4(packet, payload, header.protocol);
} else {
- BuildInEvent::trigger_l3_event(event_mgr, None);
return Err(PacketError::IncompleteIpv4Header);
}
}
@@ -466,20 +403,17 @@ fn handle_l3<'a>(
packet
.encapsulation
- .push(Encapsulation::L3_IP6(header, payload));
+ .push(Encapsulation::L3_IPV6(header, payload));
// TODO IPv6 Fragment
- BuildInEvent::trigger_l3_event(event_mgr.clone(), None);
- BuildInEvent::trigger_ip6_event(event_mgr, None);
- return handle_l4(packet, payload, header.next_header, ctx);
+ packet.event.push(PacketEvent::IPV6_EVENT);
+ return handle_l4(packet, payload, header.next_header);
} else {
- BuildInEvent::trigger_l3_event(event_mgr, None);
return Err(PacketError::IncompleteIpv6Header);
}
}
_e => {
- BuildInEvent::trigger_l3_event(event_mgr, None);
return Err(PacketError::UnsupportEthernetType);
}
}
@@ -489,10 +423,8 @@ fn handle_l4<'a>(
packet: &mut Packet<'a>,
input: &'a [u8],
next_proto: IPProtocol,
- ctx: Rc<RefCell<ThreadContex>>,
) -> Result<(), PacketError> {
- let event_mgr = ctx.borrow().get_event_mgr();
- let session_mgr = ctx.borrow().get_session_mgr();
+ packet.event.push(PacketEvent::L4_EVENT);
match next_proto {
IPProtocol::UDP => {
let result = UdpHeader::decode(input);
@@ -502,23 +434,9 @@ fn handle_l4<'a>(
.encapsulation
.push(Encapsulation::L4_UDP(header, payload));
- let session = packet2session(&packet, session_mgr);
- BuildInEvent::trigger_l4_event(event_mgr.clone(), Some(session.clone()));
- session.borrow_mut().set_session_proto(SessionProto::UDP);
- match session.borrow().get_session_state() {
- SessionState::New => {
- BuildInEvent::trigger_udp_opening_event(event_mgr, Some(session.clone()));
- }
- SessionState::Active => {
- BuildInEvent::trigger_udp_active_event(event_mgr, Some(session.clone()));
- }
- SessionState::Inactive | SessionState::Expired => {
- BuildInEvent::trigger_udp_expire_event(event_mgr, Some(session.clone()));
- }
- }
- return handle_l7(packet, payload, session, ctx);
+ packet.event.push(PacketEvent::UDP_EVENT);
+ return Ok(());
} else {
- BuildInEvent::trigger_l4_event(event_mgr, None);
return Err(PacketError::IncompleteUdpHeader);
}
}
@@ -527,119 +445,24 @@ fn handle_l4<'a>(
if let Ok((payload, header)) = result {
dbg!(&header);
- let tcp_need_closed = header.flag_rst || header.flag_fin;
packet
.encapsulation
.push(Encapsulation::L4_TCP(header, payload));
// TODO TCP Reassembly
- let session = packet2session(&packet, session_mgr);
- BuildInEvent::trigger_l4_event(event_mgr.clone(), Some(session.clone()));
- if tcp_need_closed {
- session
- .borrow_mut()
- .set_session_state(SessionState::Inactive);
- }
- session.borrow_mut().set_session_proto(SessionProto::TCP);
- match session.borrow().get_session_state() {
- SessionState::New => {
- BuildInEvent::trigger_tcp_opening_event(event_mgr, Some(session.clone()));
- }
- SessionState::Active => {
- BuildInEvent::trigger_tcp_active_event(event_mgr, Some(session.clone()));
- }
- SessionState::Expired => {
- BuildInEvent::trigger_tcp_expire_event(event_mgr, Some(session.clone()));
- }
- SessionState::Inactive => {
- BuildInEvent::trigger_tcp_closed_event(event_mgr, Some(session.clone()));
- }
- }
-
- return handle_l7(packet, payload, session, ctx);
+ packet.event.push(PacketEvent::TCP_EVENT);
+ return Ok(());
} else {
- BuildInEvent::trigger_l4_event(event_mgr, None);
return Err(PacketError::IncompleteTcpHeader);
}
}
_e => {
- BuildInEvent::trigger_l4_event(event_mgr, None);
return Err(PacketError::UnsupportIPProtocol);
}
}
}
-fn handle_l7<'a>(
- packet: &mut Packet<'a>,
- input: &'a [u8],
- session: Rc<RefCell<Session>>,
- ctx: Rc<RefCell<ThreadContex>>,
-) -> Result<(), PacketError> {
- let event_mgr = ctx.borrow().get_event_mgr();
- let next_proto = l7_identify(packet);
- match next_proto {
- L7Protocol::DNS => {
- let result = DNS_MESSAGE::decode(input);
- if let Ok((payload, header)) = result {
- dbg!(&header);
- packet
- .encapsulation
- .push(Encapsulation::L7_DNS(header, payload));
- BuildInEvent::trigger_l7_event(event_mgr.clone(), Some(session.clone()));
- BuildInEvent::trigger_dns_event(event_mgr, Some(session));
- return Ok(());
- } else {
- BuildInEvent::trigger_l7_event(event_mgr, Some(session));
- return Err(PacketError::IncompleteAppHeader);
- }
- }
- L7Protocol::HTTP => {
- let result = HTTP_MESSAGE::decode(input);
- if let Ok((payload, header)) = result {
- dbg!(&header);
- packet
- .encapsulation
- .push(Encapsulation::L7_HTTP(header, payload));
- BuildInEvent::trigger_l7_event(event_mgr.clone(), Some(session.clone()));
- BuildInEvent::trigger_http_event(event_mgr, Some(session));
- return Ok(());
- } else {
- BuildInEvent::trigger_l7_event(event_mgr, Some(session));
- return Err(PacketError::IncompleteAppHeader);
- }
- }
- L7Protocol::Unsupported => {
- BuildInEvent::trigger_l7_event(event_mgr, Some(session));
- return Err(PacketError::UnsupportAppProtocol);
- }
- }
-}
-
-enum L7Protocol {
- DNS,
- HTTP,
- Unsupported,
-}
-
-fn l7_identify(packet: &Packet) -> L7Protocol {
- let option = packet.get_inner_port();
- if option.is_none() {
- return L7Protocol::Unsupported;
- }
-
- let (src_port, dst_port) = option.unwrap();
- if src_port == 80 || dst_port == 80 {
- return L7Protocol::HTTP;
- }
-
- if src_port == 53 || dst_port == 53 {
- return L7Protocol::DNS;
- }
-
- return L7Protocol::Unsupported;
-}
-
/******************************************************************************
* TEST
******************************************************************************/
@@ -648,17 +471,13 @@ fn l7_identify(packet: &Packet) -> L7Protocol {
mod tests {
use super::Encapsulation;
use super::Packet;
- use crate::packet::packet::reverse_trace_id;
use crate::protocol::ip::IPProtocol;
use crate::protocol::ipv4::IPv4Header;
use crate::protocol::ipv6::IPv6Header;
use crate::protocol::tcp::TcpHeader;
use crate::protocol::udp::UdpHeader;
- use crate::thread::thread::ThreadContex;
- use std::cell::RefCell;
use std::net::Ipv4Addr;
use std::net::Ipv6Addr;
- use std::rc::Rc;
#[test]
fn test_packet_handle1() {
@@ -756,9 +575,8 @@ mod tests {
0x00, 0x01, 0x00, 0x01, /* DNS END */
];
- let thread_ctx = Rc::new(RefCell::new(ThreadContex::new()));
let mut packet = Packet::new(&bytes, bytes.len() as u32);
- let result = packet.handle(thread_ctx);
+ let result = packet.handle();
match result {
Ok(v) => {
@@ -898,9 +716,8 @@ mod tests {
0x2a, 0x0d, 0x0a, 0x0d, 0x0a, /* HTTP END */
];
- let thread_ctx = Rc::new(RefCell::new(ThreadContex::new()));
let mut packet = Packet::new(&bytes, bytes.len() as u32);
- let result = packet.handle(thread_ctx);
+ let result = packet.handle();
match result {
Ok(v) => {
@@ -976,13 +793,13 @@ mod tests {
packet
.encapsulation
- .push(Encapsulation::L3_IP4(ipv4_hdr.clone(), b"1"));
+ .push(Encapsulation::L3_IPV4(ipv4_hdr.clone(), b"1"));
packet
.encapsulation
.push(Encapsulation::L4_TCP(tcp_hdr.clone(), b"2"));
packet
.encapsulation
- .push(Encapsulation::L3_IP6(ipv6_hdr.clone(), b"3"));
+ .push(Encapsulation::L3_IPV6(ipv6_hdr.clone(), b"3"));
packet
.encapsulation
.push(Encapsulation::L4_UDP(udp_hdr.clone(), b"4"));
@@ -1023,11 +840,11 @@ mod tests {
assert_eq!(
packet.get_outer_l3_layer(),
- Some(Encapsulation::L3_IP4(ipv4_hdr, b"1"))
+ Some(Encapsulation::L3_IPV4(ipv4_hdr, b"1"))
);
assert_eq!(
packet.get_inner_l3_layer(),
- Some(Encapsulation::L3_IP6(ipv6_hdr, b"3"))
+ Some(Encapsulation::L3_IPV6(ipv6_hdr, b"3"))
);
assert_eq!(
packet.get_outer_l4_layer(),
@@ -1038,8 +855,6 @@ mod tests {
Some(Encapsulation::L4_UDP(udp_hdr, b"4"))
);
- assert_eq!(packet.get_trace_id(), Some("IP4->IP4;192.168.0.101->121.14.154.93;TCP->TCP;50081->443;IP6->IP6;2409:8034:4025::50:a31->2409:8034:4040:5301::204;UDP->UDP;9993->9994;".to_string()));
- assert_eq!(packet.get_reversed_trace_id() ,Some("IP4->IP4;121.14.154.93->192.168.0.101;TCP->TCP;443->50081;IP6->IP6;2409:8034:4040:5301::204->2409:8034:4025::50:a31;UDP->UDP;9994->9993;".to_string()));
- assert_eq!(reverse_trace_id(&packet.get_trace_id().unwrap()), "IP4->IP4;121.14.154.93->192.168.0.101;TCP->TCP;443->50081;IP6->IP6;2409:8034:4040:5301::204->2409:8034:4025::50:a31;UDP->UDP;9994->9993;".to_string());
+ assert_eq!(packet.get_flow_id(), Some("192.168.0.101->121.14.154.93;TCP->TCP;50081->443;2409:8034:4025::50:a31->2409:8034:4040:5301::204;UDP->UDP;9993->9994;".to_string()));
}
}
diff --git a/src/plugin/example.rs b/src/plugin/example.rs
index eb38c2c..a3ca852 100644
--- a/src/plugin/example.rs
+++ b/src/plugin/example.rs
@@ -44,22 +44,22 @@ impl EventHandle for ExamplePulgin {
self.tcp_opening_event = event_mgr
.borrow_mut()
- .event2index(BUILDIN_TCP_OPENING_EVENT);
+ .event2index(BUILTIN_TCP_OPENING_EVENT);
event_mgr
.borrow_mut()
.register(self.tcp_opening_event, Box::new(self.clone()));
- self.tcp_active_event = event_mgr.borrow_mut().event2index(BUILDIN_TCP_ACTIVE_EVENT);
+ self.tcp_active_event = event_mgr.borrow_mut().event2index(BUILTIN_TCP_ACTIVE_EVENT);
event_mgr
.borrow_mut()
.register(self.tcp_active_event, Box::new(self.clone()));
- self.tcp_expire_event = event_mgr.borrow_mut().event2index(BUILDIN_TCP_EXPIRE_EVENT);
+ self.tcp_expire_event = event_mgr.borrow_mut().event2index(BUILTIN_TCP_EXPIRE_EVENT);
event_mgr
.borrow_mut()
.register(self.tcp_expire_event, Box::new(self.clone()));
- self.tcp_closed_event = event_mgr.borrow_mut().event2index(BUILDIN_TCP_CLOSED_EVENT);
+ self.tcp_closed_event = event_mgr.borrow_mut().event2index(BUILTIN_TCP_CLOSED_EVENT);
event_mgr
.borrow_mut()
.register(self.tcp_closed_event, Box::new(self.clone()));
@@ -72,7 +72,12 @@ impl EventHandle for ExamplePulgin {
.register(self.http_opening_event, Box::new(self.clone()));
}
- fn handle(&mut self, index: usize, packet: &Packet, session: Option<Rc<RefCell<Session>>>) {
+ fn handle(
+ &mut self,
+ index: usize,
+ packet: Option<&Packet>,
+ session: Option<Rc<RefCell<Session>>>,
+ ) {
if session.is_none() {
return;
}
@@ -83,22 +88,22 @@ impl EventHandle for ExamplePulgin {
let session = session.unwrap();
if index == self.tcp_opening_event {
println!(
- "{} handle BUILDIN_TCP_OPENING_EVENT: {:?}",
+ "{} handle BUILTIN_TCP_OPENING_EVENT: {:?}",
self.plugin_name, session
);
} else if index == self.tcp_active_event {
println!(
- "{} handle BUILDIN_TCP_ACTIVE_EVENT: {:?}",
+ "{} handle BUILTIN_TCP_ACTIVE_EVENT: {:?}",
self.plugin_name, session
);
} else if index == self.tcp_expire_event {
println!(
- "{} handle BUILDIN_TCP_EXPIRE_EVENT: {:?}",
+ "{} handle BUILTIN_TCP_EXPIRE_EVENT: {:?}",
self.plugin_name, session
);
} else if index == self.tcp_closed_event {
println!(
- "{} handle BUILDIN_TCP_CLOSED_EVENT: {:?}",
+ "{} handle BUILTIN_TCP_CLOSED_EVENT: {:?}",
self.plugin_name, session
);
} else if index == self.http_opening_event {
diff --git a/src/session/manager.rs b/src/session/manager.rs
index 60222d6..462949f 100644
--- a/src/session/manager.rs
+++ b/src/session/manager.rs
@@ -1,7 +1,7 @@
-use crate::session::session::reverse_session_id;
use crate::session::session::Session;
-use crate::session::session::SessionProto;
+use crate::session::session::SessionDirection;
use crate::session::session::SessionState;
+use crate::utils::utils::reverse_flow_id;
use chrono::Utc;
use std::cell::RefCell;
use std::collections::HashMap;
@@ -27,13 +27,47 @@ impl SessionManager {
}
}
+ pub fn update(&mut self, session_id: String) -> Rc<RefCell<Session>> {
+ let result = self.get_session(&session_id);
+ if result.is_none() {
+ // New Session
+ let mut new_session = Session::new(session_id.clone());
+ new_session.update_session_current_dir(SessionDirection::C2S);
+
+ let rc_new_session = Rc::new(RefCell::new(new_session));
+ self.insert_session(session_id.clone(), rc_new_session.clone());
+ rc_new_session
+ } else {
+ // Update Session
+ let rc_old_session = result.unwrap();
+ rc_old_session
+ .borrow_mut()
+ .set_session_state(SessionState::Active);
+ rc_old_session.borrow_mut().update_session_last_ts();
+ rc_old_session.borrow_mut().update_session_expire_ts();
+
+ // Update Session Direction
+ if rc_old_session.borrow().get_session_id() == session_id {
+ rc_old_session
+ .borrow_mut()
+ .update_session_current_dir(SessionDirection::C2S);
+ } else {
+ rc_old_session
+ .borrow_mut()
+ .update_session_current_dir(SessionDirection::S2C);
+ }
+
+ rc_old_session
+ }
+ }
+
pub fn get_session(&self, session_id: &String) -> Option<Rc<RefCell<Session>>> {
let result = self.sessions.get(session_id).map(|session| session.clone());
if result.is_some() {
return result;
}
- let reversed_id = reverse_session_id(session_id);
+ let reversed_id = reverse_flow_id(session_id);
self.sessions
.get(reversed_id.as_str())
.map(|session| session.clone())
@@ -49,39 +83,38 @@ impl SessionManager {
return result;
}
- let reversed_id = reverse_session_id(&session_id.to_string());
+ let reversed_id = reverse_flow_id(&session_id.to_string());
self.sessions.remove(reversed_id.as_str())
}
- pub fn expire_sessions(&mut self) {
- let now = Utc::now().timestamp();
- let mut expired_sessions = Vec::with_capacity(1024);
- for (session_id, session) in &self.sessions {
- if session.borrow().get_session_expire_ts() < now {
- expired_sessions.push(session_id.clone());
+ pub fn expire_oldest_session(&mut self) -> Option<Rc<RefCell<Session>>> {
+ let mut oldest_session: Option<Rc<RefCell<Session>>> = None;
+ let mut oldest_session_expire_ts = Utc::now().timestamp();
+
+ for (_, session) in &self.sessions {
+ let session_expire_ts = session.borrow().get_session_expire_ts();
+ if session_expire_ts < oldest_session_expire_ts {
+ oldest_session = Some(session.clone());
+ oldest_session_expire_ts = session_expire_ts;
}
}
- for session_id in expired_sessions {
- let option = self.remove_session(&session_id);
- if let Some(session) = option {
- println!("Session expired: {}", session_id);
- session
- .borrow_mut()
- .set_session_state(SessionState::Expired);
- session
- .borrow_mut()
- .set_session_end_ts(Utc::now().timestamp());
-
- match session.borrow().get_session_proto() {
- SessionProto::TCP => {
- //BuildInEvent::trigger_tcp_expire_event(event_mgr, Some(session));
- }
- SessionProto::UDP => {
- //BuildInEvent::trigger_udp_expire_event(event_mgr, Some(session));
- }
- }
- }
+
+ if oldest_session.is_some() {
+ let oldest_session_id = oldest_session.clone().unwrap().borrow().get_session_id();
+ oldest_session
+ .clone()
+ .unwrap()
+ .borrow_mut()
+ .set_session_state(SessionState::Expired);
+ oldest_session
+ .clone()
+ .unwrap()
+ .borrow_mut()
+ .set_session_end_ts(oldest_session_expire_ts);
+ self.remove_session(&oldest_session_id);
}
+
+ oldest_session
}
}
@@ -96,12 +129,10 @@ mod tests {
use std::cell::RefCell;
use std::rc::Rc;
- // use std::{thread, time};
-
#[test]
fn test_session_manager() {
- let session_id = "IP4->IP4;192.168.0.1->192.168.0.2;UDP->UDP;2345->80;".to_string();
- let reversed_id = "IP4->IP4;192.168.0.2->192.168.0.1;UDP->UDP;80->2345;".to_string();
+ let session_id = "192.168.0.1->192.168.0.2;UDP->UDP;2345->80;".to_string();
+ let reversed_id = "192.168.0.2->192.168.0.1;UDP->UDP;80->2345;".to_string();
let session = Rc::new(RefCell::new(Session::new(session_id.clone())));
// Create Session Manager
@@ -117,10 +148,6 @@ mod tests {
assert_eq!(session_mgr.get_session(&session_id).is_some(), true);
assert_eq!(session_mgr.get_session(&reversed_id).is_some(), true);
- // Expire Session
- // thread::sleep(time::Duration::from_secs(61));
- // session_mgr.expire_sessions();
-
// Delete session
assert_eq!(session_mgr.remove_session(&reversed_id).is_some(), true);
diff --git a/src/session/session.rs b/src/session/session.rs
index adbdb64..e8b2be4 100644
--- a/src/session/session.rs
+++ b/src/session/session.rs
@@ -1,10 +1,5 @@
-use crate::packet::packet::reverse_trace_id;
-use crate::packet::packet::Packet;
-use crate::session::manager::SessionManager;
use chrono::Utc;
-use std::cell::RefCell;
use std::collections::HashMap;
-use std::rc::Rc;
/******************************************************************************
* Struct
@@ -200,59 +195,6 @@ impl Session {
}
/******************************************************************************
- * Utils API
- ******************************************************************************/
-
-pub fn reverse_session_id(session_id: &String) -> String {
- reverse_trace_id(session_id)
-}
-
-pub fn packet2session(
- packet: &Packet,
- session_mgr: Rc<RefCell<SessionManager>>,
-) -> Rc<RefCell<Session>> {
- let rc_session;
- let packet_len = packet.orig_len as u64;
-
- let session_id = packet.get_trace_id().unwrap();
- let option = session_mgr.borrow().get_session(&session_id);
- if option.is_none() {
- let mut session = Session::new(session_id.clone());
- session.inc_session_c2s_metrics(0, 0, 1, packet_len);
- session.update_session_current_dir(SessionDirection::C2S);
- rc_session = Rc::new(RefCell::new(session));
- session_mgr
- .borrow_mut()
- .insert_session(session_id, rc_session.clone());
- } else {
- rc_session = option.unwrap();
- rc_session
- .borrow_mut()
- .set_session_state(SessionState::Active);
-
- if rc_session.borrow().get_session_id() == session_id {
- rc_session
- .borrow_mut()
- .inc_session_c2s_metrics(0, 0, 1, packet_len);
- rc_session
- .borrow_mut()
- .update_session_current_dir(SessionDirection::C2S);
- } else {
- rc_session
- .borrow_mut()
- .inc_session_s2c_metrics(0, 0, 1, packet_len);
- rc_session
- .borrow_mut()
- .update_session_current_dir(SessionDirection::S2C);
- }
- rc_session.borrow_mut().update_session_last_ts();
- rc_session.borrow_mut().update_session_expire_ts();
- }
-
- rc_session
-}
-
-/******************************************************************************
* TEST
******************************************************************************/
diff --git a/src/utils/mod.rs b/src/utils/mod.rs
new file mode 100644
index 0000000..b5614dd
--- /dev/null
+++ b/src/utils/mod.rs
@@ -0,0 +1 @@
+pub mod utils;
diff --git a/src/utils/utils.rs b/src/utils/utils.rs
new file mode 100644
index 0000000..e784b34
--- /dev/null
+++ b/src/utils/utils.rs
@@ -0,0 +1,27 @@
+pub fn reverse_flow_id(flow_id: &String) -> String {
+ let mut reversed_flow_id = String::new();
+ let mut flow_id_vec: Vec<&str> = flow_id.split(";").collect();
+ flow_id_vec.pop();
+ for item in flow_id_vec.iter() {
+ let mut item_vec: Vec<&str> = item.split("->").collect();
+ item_vec.reverse();
+ reversed_flow_id.push_str(&item_vec.join("->"));
+ reversed_flow_id.push_str(";");
+ }
+ reversed_flow_id
+}
+
+/******************************************************************************
+ * TEST
+ ******************************************************************************/
+
+#[cfg(test)]
+mod tests {
+ use super::reverse_flow_id;
+
+ #[test]
+ fn test_reverse_flow_id() {
+ let flow_id = "192.168.0.101->121.14.154.93;TCP->TCP;50081->443;2409:8034:4025::50:a31->2409:8034:4040:5301::204;UDP->UDP;9993->9994;".to_string();
+ assert_eq!(reverse_flow_id(&flow_id), "121.14.154.93->192.168.0.101;TCP->TCP;443->50081;2409:8034:4040:5301::204->2409:8034:4025::50:a31;UDP->UDP;9994->9993;".to_string());
+ }
+}