diff options
| author | luwenpeng <[email protected]> | 2023-09-06 18:30:14 +0800 |
|---|---|---|
| committer | luwenpeng <[email protected]> | 2023-09-07 14:27:24 +0800 |
| commit | e16b028be675ee35f4d08521accbc52ee6e6b182 (patch) | |
| tree | 03ea80f57d4f2c2b98dd125fa9baa2cfd770dc88 | |
| parent | 142e30257e2d852b381f1ca47257095b888627cd (diff) | |
[refactor] Decouple packets from sessions/events
| -rw-r--r-- | src/event/event.rs | 113 | ||||
| -rw-r--r-- | src/event/manager.rs | 13 | ||||
| -rw-r--r-- | src/lib.rs | 3 | ||||
| -rw-r--r-- | src/main.rs | 102 | ||||
| -rw-r--r-- | src/packet/error.rs | 7 | ||||
| -rw-r--r-- | src/packet/packet.rs | 339 | ||||
| -rw-r--r-- | src/plugin/example.rs | 23 | ||||
| -rw-r--r-- | src/session/manager.rs | 103 | ||||
| -rw-r--r-- | src/session/session.rs | 58 | ||||
| -rw-r--r-- | src/utils/mod.rs | 1 | ||||
| -rw-r--r-- | src/utils/utils.rs | 27 |
11 files changed, 354 insertions, 435 deletions
diff --git a/src/event/event.rs b/src/event/event.rs index 8ab3918..e2e2842 100644 --- a/src/event/event.rs +++ b/src/event/event.rs @@ -4,137 +4,156 @@ use std::cell::RefCell; use std::rc::Rc; // L2 Event -pub const BUILDIN_L2_EVENT: &str = "BUILDIN_L2_EVENT"; +pub const BUILTIN_L2_EVENT: &str = "BUILTIN_L2_EVENT"; // L3 Event -pub const BUILDIN_L3_EVENT: &str = "BUILDIN_L3_EVENT"; -pub const BUILDIN_IP4_EVENT: &str = "BUILDIN_IP4_EVENT"; -pub const BUILDIN_IP6_EVENT: &str = "BUILDIN_IP6_EVENT"; +pub const BUILTIN_L3_EVENT: &str = "BUILTIN_L3_EVENT"; +pub const BUILTIN_IPV4_EVENT: &str = "BUILTIN_IPV4_EVENT"; +pub const BUILTIN_IPV6_EVENT: &str = "BUILTIN_IPV6_EVENT"; // L4 Event -pub const BUILDIN_L4_EVENT: &str = "BUILDIN_L4_EVENT"; -pub const BUILDIN_TCP_OPENING_EVENT: &str = "BUILDIN_TCP_OPENING_EVENT"; -pub const BUILDIN_TCP_ACTIVE_EVENT: &str = "BUILDIN_TCP_ACTIVE_EVENT"; -pub const BUILDIN_TCP_EXPIRE_EVENT: &str = "BUILDIN_TCP_EXPIRE_EVENT"; -pub const BUILDIN_TCP_CLOSED_EVENT: &str = "BUILDIN_TCP_CLOSED_EVENT"; +pub const BUILTIN_L4_EVENT: &str = "BUILTIN_L4_EVENT"; -pub const BUILDIN_UDP_OPENING_EVENT: &str = "BUILDIN_UDP_OPENING_EVENT"; -pub const BUILDIN_UDP_ACTIVE_EVENT: &str = "BUILDIN_UDP_ACTIVE_EVENT"; -pub const BUILDIN_UDP_EXPIRE_EVENT: &str = "BUILDIN_UDP_EXPIRE_EVENT"; +pub const BUILTIN_TCP_EVENT: &str = "BUILTIN_TCP_EVENT"; +pub const BUILTIN_TCP_OPENING_EVENT: &str = "BUILTIN_TCP_OPENING_EVENT"; +pub const BUILTIN_TCP_ACTIVE_EVENT: &str = "BUILTIN_TCP_ACTIVE_EVENT"; +pub const BUILTIN_TCP_EXPIRE_EVENT: &str = "BUILTIN_TCP_EXPIRE_EVENT"; +pub const BUILTIN_TCP_CLOSED_EVENT: &str = "BUILTIN_TCP_CLOSED_EVENT"; + +pub const BUILTIN_UDP_EVENT: &str = "BUILTIN_UDP_EVENT"; +pub const BUILTIN_UDP_OPENING_EVENT: &str = "BUILTIN_UDP_OPENING_EVENT"; +pub const BUILTIN_UDP_ACTIVE_EVENT: &str = "BUILTIN_UDP_ACTIVE_EVENT"; +pub const BUILTIN_UDP_EXPIRE_EVENT: &str = "BUILTIN_UDP_EXPIRE_EVENT"; // L7 Event -pub const BUILDIN_L7_EVENT: &str = "BUILDIN_L7_EVENT"; -pub const BUILDIN_DNS_EVENT: &str = "BUILDIN_DNS_EVENT"; -pub const BUILDIN_HTTP_EVENT: &str = "BUILDIN_HTTP_EVENT"; +pub const BUILTIN_L7_EVENT: &str = "BUILTIN_L7_EVENT"; +pub const BUILTIN_DNS_EVENT: &str = "BUILTIN_DNS_EVENT"; +pub const BUILTIN_HTTP_EVENT: &str = "BUILTIN_HTTP_EVENT"; -pub struct BuildInEvent {} +pub struct BuiltInEvent {} -impl BuildInEvent { +impl BuiltInEvent { pub fn trigger_l2_event(mgr: Rc<RefCell<EventManager>>, session: Option<Rc<RefCell<Session>>>) { - let buildin_l2_event = mgr.borrow_mut().event2index(BUILDIN_L2_EVENT); - mgr.borrow_mut().trigger(buildin_l2_event, session); + let builtin_l2_event = mgr.borrow_mut().event2index(BUILTIN_L2_EVENT); + mgr.borrow_mut().trigger(builtin_l2_event, session); } pub fn trigger_l3_event(mgr: Rc<RefCell<EventManager>>, session: Option<Rc<RefCell<Session>>>) { - let buildin_l3_event = mgr.borrow_mut().event2index(BUILDIN_L3_EVENT); - mgr.borrow_mut().trigger(buildin_l3_event, session); + let builtin_l3_event = mgr.borrow_mut().event2index(BUILTIN_L3_EVENT); + mgr.borrow_mut().trigger(builtin_l3_event, session); } pub fn trigger_ip4_event( mgr: Rc<RefCell<EventManager>>, session: Option<Rc<RefCell<Session>>>, ) { - let buildin_ip4_event = mgr.borrow_mut().event2index(BUILDIN_IP4_EVENT); - mgr.borrow_mut().trigger(buildin_ip4_event, session); + let builtin_ip4_event = mgr.borrow_mut().event2index(BUILTIN_IPV4_EVENT); + mgr.borrow_mut().trigger(builtin_ip4_event, session); } pub fn trigger_ip6_event( mgr: Rc<RefCell<EventManager>>, session: Option<Rc<RefCell<Session>>>, ) { - let buildin_ip6_event = mgr.borrow_mut().event2index(BUILDIN_IP6_EVENT); - mgr.borrow_mut().trigger(buildin_ip6_event, session); + let builtin_ip6_event = mgr.borrow_mut().event2index(BUILTIN_IPV6_EVENT); + mgr.borrow_mut().trigger(builtin_ip6_event, session); } pub fn trigger_l4_event(mgr: Rc<RefCell<EventManager>>, session: Option<Rc<RefCell<Session>>>) { - let buildin_l4_event = mgr.borrow_mut().event2index(BUILDIN_L4_EVENT); - mgr.borrow_mut().trigger(buildin_l4_event, session); + let builtin_l4_event = mgr.borrow_mut().event2index(BUILTIN_L4_EVENT); + mgr.borrow_mut().trigger(builtin_l4_event, session); + } + + pub fn trigger_tcp_event( + mgr: Rc<RefCell<EventManager>>, + session: Option<Rc<RefCell<Session>>>, + ) { + let builtin_tcp_event = mgr.borrow_mut().event2index(BUILTIN_TCP_EVENT); + mgr.borrow_mut().trigger(builtin_tcp_event, session); } pub fn trigger_tcp_opening_event( mgr: Rc<RefCell<EventManager>>, session: Option<Rc<RefCell<Session>>>, ) { - let buildin_tcp_opening_event = mgr.borrow_mut().event2index(BUILDIN_TCP_OPENING_EVENT); - mgr.borrow_mut().trigger(buildin_tcp_opening_event, session); + let builtin_tcp_opening_event = mgr.borrow_mut().event2index(BUILTIN_TCP_OPENING_EVENT); + mgr.borrow_mut().trigger(builtin_tcp_opening_event, session); } pub fn trigger_tcp_active_event( mgr: Rc<RefCell<EventManager>>, session: Option<Rc<RefCell<Session>>>, ) { - let buildin_tcp_active_event = mgr.borrow_mut().event2index(BUILDIN_TCP_ACTIVE_EVENT); - mgr.borrow_mut().trigger(buildin_tcp_active_event, session); + let builtin_tcp_active_event = mgr.borrow_mut().event2index(BUILTIN_TCP_ACTIVE_EVENT); + mgr.borrow_mut().trigger(builtin_tcp_active_event, session); } pub fn trigger_tcp_expire_event( mgr: Rc<RefCell<EventManager>>, session: Option<Rc<RefCell<Session>>>, ) { - let buildin_tcp_expire_event = mgr.borrow_mut().event2index(BUILDIN_TCP_EXPIRE_EVENT); - mgr.borrow_mut().trigger(buildin_tcp_expire_event, session); + let builtin_tcp_expire_event = mgr.borrow_mut().event2index(BUILTIN_TCP_EXPIRE_EVENT); + mgr.borrow_mut().trigger(builtin_tcp_expire_event, session); } pub fn trigger_tcp_closed_event( mgr: Rc<RefCell<EventManager>>, session: Option<Rc<RefCell<Session>>>, ) { - let buildin_tcp_closed_event = mgr.borrow_mut().event2index(BUILDIN_TCP_CLOSED_EVENT); - mgr.borrow_mut().trigger(buildin_tcp_closed_event, session); + let builtin_tcp_closed_event = mgr.borrow_mut().event2index(BUILTIN_TCP_CLOSED_EVENT); + mgr.borrow_mut().trigger(builtin_tcp_closed_event, session); + } + + pub fn trigger_udp_event( + mgr: Rc<RefCell<EventManager>>, + session: Option<Rc<RefCell<Session>>>, + ) { + let builtin_udp_event = mgr.borrow_mut().event2index(BUILTIN_UDP_EVENT); + mgr.borrow_mut().trigger(builtin_udp_event, session); } pub fn trigger_udp_opening_event( mgr: Rc<RefCell<EventManager>>, session: Option<Rc<RefCell<Session>>>, ) { - let buildin_udp_opening_event = mgr.borrow_mut().event2index(BUILDIN_UDP_OPENING_EVENT); - mgr.borrow_mut().trigger(buildin_udp_opening_event, session); + let builtin_udp_opening_event = mgr.borrow_mut().event2index(BUILTIN_UDP_OPENING_EVENT); + mgr.borrow_mut().trigger(builtin_udp_opening_event, session); } pub fn trigger_udp_active_event( mgr: Rc<RefCell<EventManager>>, session: Option<Rc<RefCell<Session>>>, ) { - let buildin_udp_active_event = mgr.borrow_mut().event2index(BUILDIN_UDP_ACTIVE_EVENT); - mgr.borrow_mut().trigger(buildin_udp_active_event, session); + let builtin_udp_active_event = mgr.borrow_mut().event2index(BUILTIN_UDP_ACTIVE_EVENT); + mgr.borrow_mut().trigger(builtin_udp_active_event, session); } pub fn trigger_udp_expire_event( mgr: Rc<RefCell<EventManager>>, session: Option<Rc<RefCell<Session>>>, ) { - let buildin_udp_expire_event = mgr.borrow_mut().event2index(BUILDIN_UDP_EXPIRE_EVENT); - mgr.borrow_mut().trigger(buildin_udp_expire_event, session); + let builtin_udp_expire_event = mgr.borrow_mut().event2index(BUILTIN_UDP_EXPIRE_EVENT); + mgr.borrow_mut().trigger(builtin_udp_expire_event, session); } pub fn trigger_l7_event(mgr: Rc<RefCell<EventManager>>, session: Option<Rc<RefCell<Session>>>) { - let buildin_l7_event = mgr.borrow_mut().event2index(BUILDIN_L7_EVENT); - mgr.borrow_mut().trigger(buildin_l7_event, session); + let builtin_l7_event = mgr.borrow_mut().event2index(BUILTIN_L7_EVENT); + mgr.borrow_mut().trigger(builtin_l7_event, session); } pub fn trigger_dns_event( mgr: Rc<RefCell<EventManager>>, session: Option<Rc<RefCell<Session>>>, ) { - let buildin_dns_event = mgr.borrow_mut().event2index(BUILDIN_DNS_EVENT); - mgr.borrow_mut().trigger(buildin_dns_event, session); + let builtin_dns_event = mgr.borrow_mut().event2index(BUILTIN_DNS_EVENT); + mgr.borrow_mut().trigger(builtin_dns_event, session); } pub fn trigger_http_event( mgr: Rc<RefCell<EventManager>>, session: Option<Rc<RefCell<Session>>>, ) { - let buildin_http_event = mgr.borrow_mut().event2index(BUILDIN_HTTP_EVENT); - mgr.borrow_mut().trigger(buildin_http_event, session); + let builtin_http_event = mgr.borrow_mut().event2index(BUILTIN_HTTP_EVENT); + mgr.borrow_mut().trigger(builtin_http_event, session); } } diff --git a/src/event/manager.rs b/src/event/manager.rs index 64975d6..867a77f 100644 --- a/src/event/manager.rs +++ b/src/event/manager.rs @@ -7,7 +7,12 @@ use std::rc::Rc; pub trait EventHandle { fn init(&mut self); - fn handle(&mut self, index: usize, packet: &Packet, session: Option<Rc<RefCell<Session>>>); + fn handle( + &mut self, + index: usize, + packet: Option<&Packet>, + session: Option<Rc<RefCell<Session>>>, + ); } pub struct EventManager { @@ -61,7 +66,7 @@ impl EventManager { self.ready_event.push_back((index, session)); } - pub fn dispatch(&mut self, packet: &Packet) { + pub fn dispatch(&mut self, packet: Option<&Packet>) { loop { if let Some(event) = self.ready_event.pop_front() { println!("Dispatch event: {:?}", self.index2event.get(&event.0)); @@ -222,7 +227,7 @@ mod tests { // Handle packet let mut packet = Packet::new(&bytes, bytes.len() as u32); - let result = packet.handle(thread_ctx.clone()); + let result = packet.handle(); match result { Ok(_v) => { // println!("SUCCESS: {:?}, {:?}", packet, _v); @@ -242,7 +247,7 @@ mod tests { .borrow_mut() .get_event_mgr() .borrow_mut() - .dispatch(&packet); + .dispatch(Some(&packet)); // assert_eq!(1, 0); } @@ -3,4 +3,5 @@ pub mod protocol; pub mod event; pub mod session; pub mod plugin; -pub mod thread;
\ No newline at end of file +pub mod thread; +pub mod utils;
\ No newline at end of file diff --git a/src/main.rs b/src/main.rs index a238862..c1286bd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,18 +1,88 @@ use std::cell::RefCell; use std::rc::Rc; +use stellar_rs::event::event::BuiltInEvent; use stellar_rs::event::manager::EventHandle; +use stellar_rs::event::manager::EventManager; use stellar_rs::packet::capture::PacketCapture; use stellar_rs::packet::packet::Packet; +use stellar_rs::packet::packet::PacketEvent; use stellar_rs::plugin::example::ExamplePulgin; +use stellar_rs::session::session::Session; +use stellar_rs::session::session::SessionProto; +use stellar_rs::session::session::SessionState; use stellar_rs::thread::thread::ThreadContex; -fn packet_callback(data: &[u8], len: u32, ctx: Rc<RefCell<ThreadContex>>) { +fn trigger_packet_event( + packet: &Packet, + session: Option<Rc<RefCell<Session>>>, + event_mgr: Rc<RefCell<EventManager>>, +) { + for packet_event in &packet.event { + match packet_event { + PacketEvent::L2_EVENT => { + BuiltInEvent::trigger_l2_event(event_mgr.clone(), session.clone()); + } + PacketEvent::L3_EVENT => { + BuiltInEvent::trigger_l3_event(event_mgr.clone(), session.clone()); + } + PacketEvent::IPV4_EVENT => { + BuiltInEvent::trigger_ip4_event(event_mgr.clone(), session.clone()); + } + PacketEvent::IPV6_EVENT => { + BuiltInEvent::trigger_ip6_event(event_mgr.clone(), session.clone()); + } + PacketEvent::L4_EVENT => { + BuiltInEvent::trigger_l4_event(event_mgr.clone(), session.clone()); + } + PacketEvent::TCP_EVENT => { + BuiltInEvent::trigger_tcp_event(event_mgr.clone(), session.clone()); + } + PacketEvent::UDP_EVENT => { + BuiltInEvent::trigger_udp_event(event_mgr.clone(), session.clone()); + } + } + } +} + +fn trigger_session_event( + session: Option<Rc<RefCell<Session>>>, + event_mgr: Rc<RefCell<EventManager>>, +) { + if session.is_none() { + return; + } + + let session_state = session.clone().unwrap().borrow_mut().get_session_state(); + let session_proto = session.clone().unwrap().borrow_mut().get_session_proto(); + + match session_state { + SessionState::New => match session_proto { + SessionProto::TCP => BuiltInEvent::trigger_tcp_opening_event(event_mgr, session), + SessionProto::UDP => BuiltInEvent::trigger_udp_opening_event(event_mgr, session), + }, + SessionState::Active => match session_proto { + SessionProto::TCP => BuiltInEvent::trigger_tcp_active_event(event_mgr, session), + SessionProto::UDP => BuiltInEvent::trigger_udp_active_event(event_mgr, session), + }, + SessionState::Inactive => match session_proto { + SessionProto::TCP => BuiltInEvent::trigger_tcp_closed_event(event_mgr, session), + SessionProto::UDP => BuiltInEvent::trigger_udp_expire_event(event_mgr, session), + }, + SessionState::Expired => match session_proto { + SessionProto::TCP => BuiltInEvent::trigger_tcp_expire_event(event_mgr, session), + SessionProto::UDP => BuiltInEvent::trigger_udp_expire_event(event_mgr, session), + }, + } +} + +fn capture_callback(data: &[u8], len: u32, ctx: Rc<RefCell<ThreadContex>>) { let event_mgr = ctx.borrow_mut().get_event_mgr(); let session_mgr = ctx.borrow_mut().get_session_mgr(); - let empty = Packet::new(b"", 0); let mut packet = Packet::new(data, len); - let result = packet.handle(ctx); + + // Handle Packet + let result = packet.handle(); match result { Ok(_left) => { // println!("SUCCESS: {:?}, {:?}", packet, left); @@ -23,14 +93,28 @@ fn packet_callback(data: &[u8], len: u32, ctx: Rc<RefCell<ThreadContex>>) { // println!("ERROR Data: {:?}", packet); // println!("ERROR Code: {:?}", e); println!("ERROR Desc: {}", e); + return; } } - // Hanlde Packet Event - event_mgr.borrow_mut().dispatch(&packet); - // Hanlde Expire Event - session_mgr.borrow_mut().expire_sessions(); - event_mgr.borrow_mut().dispatch(&empty); + // Packets have sessions, triggering both packet events and session events + if packet.get_inner_tuple().is_some() { + let flow_id = packet.get_flow_id().unwrap(); + let session = session_mgr.borrow_mut().update(flow_id); + trigger_packet_event(&packet, Some(session.clone()), event_mgr.clone()); + trigger_session_event(Some(session.clone()), event_mgr.clone()); + // Packets have no sessions, only packet events are triggered + } else { + trigger_packet_event(&packet, None, event_mgr.clone()); + } + + // Handle packet events and session events on the current package + event_mgr.borrow_mut().dispatch(Some(&packet)); + + // Handle expire events without packets + let session = session_mgr.borrow_mut().expire_oldest_session(); + trigger_session_event(session, event_mgr.clone()); + event_mgr.borrow_mut().dispatch(None); } fn main() { @@ -40,5 +124,5 @@ fn main() { PacketCapture::show_devices(); let mut cap = PacketCapture::new("en0"); - cap.poll_packet(packet_callback, thread_ctx); + cap.poll_packet(capture_callback, thread_ctx); } diff --git a/src/packet/error.rs b/src/packet/error.rs index fe0620e..174930c 100644 --- a/src/packet/error.rs +++ b/src/packet/error.rs @@ -18,10 +18,6 @@ pub enum PacketError { // L4 IncompleteUdpHeader, IncompleteTcpHeader, - - // L7 - IncompleteAppHeader, - UnsupportAppProtocol, } impl core::fmt::Display for PacketError { @@ -40,9 +36,6 @@ impl core::fmt::Display for PacketError { // L4 PacketError::IncompleteUdpHeader => write!(f, "Incomplete UDP Header"), PacketError::IncompleteTcpHeader => write!(f, "Incomplete TCP Header"), - // L7 - PacketError::IncompleteAppHeader => write!(f, "Incomplete App Header"), - PacketError::UnsupportAppProtocol => write!(f, "Unsupport App Protocol"), } } } diff --git a/src/packet/packet.rs b/src/packet/packet.rs index f80f4d6..2336946 100644 --- a/src/packet/packet.rs +++ b/src/packet/packet.rs @@ -1,38 +1,37 @@ -use crate::event::event::BuildInEvent; use crate::packet::error::PacketError; use crate::protocol::codec::Decode; -use crate::protocol::dns::DNS_MESSAGE; use crate::protocol::ethernet::EtherType; use crate::protocol::ethernet::EthernetFrame; -use crate::protocol::http::HTTP_MESSAGE; use crate::protocol::ip::IPProtocol; use crate::protocol::ipv4::IPv4Header; use crate::protocol::ipv6::IPv6Header; use crate::protocol::tcp::TcpHeader; use crate::protocol::udp::UdpHeader; -use crate::session::session::packet2session; -use crate::session::session::Session; -use crate::session::session::SessionProto; -use crate::session::session::SessionState; -use crate::thread::thread::ThreadContex; -use std::cell::RefCell; -use std::rc::Rc; #[allow(non_camel_case_types)] #[derive(Clone, Debug, PartialEq)] pub enum Encapsulation<'a> { L2_ETH(EthernetFrame, &'a [u8]), - L3_IP4(IPv4Header, &'a [u8]), - L3_IP6(IPv6Header, &'a [u8]), + L3_IPV4(IPv4Header, &'a [u8]), + L3_IPV6(IPv6Header, &'a [u8]), L4_TCP(TcpHeader, &'a [u8]), L4_UDP(UdpHeader, &'a [u8]), - L7_DNS(DNS_MESSAGE, &'a [u8]), - L7_HTTP(HTTP_MESSAGE, &'a [u8]), + UNSUPPORTED(&'a [u8]), +} - Unsupported(&'a [u8]), +#[allow(non_camel_case_types)] +#[derive(Clone, Debug, PartialEq)] +pub enum PacketEvent { + L2_EVENT, + L3_EVENT, + IPV4_EVENT, + IPV6_EVENT, + L4_EVENT, + TCP_EVENT, + UDP_EVENT, } #[derive(Debug)] @@ -40,6 +39,7 @@ pub struct Packet<'a> { pub orig_data: &'a [u8], pub orig_len: u32, pub encapsulation: Vec<Encapsulation<'a>>, + pub event: Vec<PacketEvent>, } impl Packet<'_> { @@ -48,24 +48,25 @@ impl Packet<'_> { orig_data: data, orig_len: len, encapsulation: vec![], + event: vec![], } } - pub fn handle(&mut self, ctx: Rc<RefCell<ThreadContex>>) -> Result<(), PacketError> { + pub fn handle(&mut self) -> Result<(), PacketError> { if self.orig_data.len() != self.orig_len as usize { return Err(PacketError::InvalidPacketLength); } - return handle_l2(self, self.orig_data, ctx); + return handle_l2(self, self.orig_data); } pub fn get_outer_l3_layer(&self) -> Option<Encapsulation> { let num = self.encapsulation.len(); for i in 0..num { match self.encapsulation[i] { - Encapsulation::L3_IP4(_, _) => { + Encapsulation::L3_IPV4(_, _) => { return Some(self.encapsulation[i].clone()); } - Encapsulation::L3_IP6(_, _) => { + Encapsulation::L3_IPV6(_, _) => { return Some(self.encapsulation[i].clone()); } _ => continue, @@ -79,10 +80,10 @@ impl Packet<'_> { let num = self.encapsulation.len(); for i in (0..num).rev() { match self.encapsulation[i] { - Encapsulation::L3_IP4(_, _) => { + Encapsulation::L3_IPV4(_, _) => { return Some(self.encapsulation[i].clone()); } - Encapsulation::L3_IP6(_, _) => { + Encapsulation::L3_IPV6(_, _) => { return Some(self.encapsulation[i].clone()); } _ => continue, @@ -130,13 +131,13 @@ impl Packet<'_> { let num = self.encapsulation.len(); for i in 0..num { match self.encapsulation[i] { - Encapsulation::L3_IP4(ref header, _) => { + Encapsulation::L3_IPV4(ref header, _) => { return Some(( header.source_address.to_string(), header.dest_address.to_string(), )); } - Encapsulation::L3_IP6(ref header, _) => { + Encapsulation::L3_IPV6(ref header, _) => { return Some(( header.source_address.to_string(), header.dest_address.to_string(), @@ -153,13 +154,13 @@ impl Packet<'_> { let num = self.encapsulation.len(); for i in (0..num).rev() { match self.encapsulation[i] { - Encapsulation::L3_IP4(ref header, _) => { + Encapsulation::L3_IPV4(ref header, _) => { return Some(( header.source_address.to_string(), header.dest_address.to_string(), )); } - Encapsulation::L3_IP6(ref header, _) => { + Encapsulation::L3_IPV6(ref header, _) => { return Some(( header.source_address.to_string(), header.dest_address.to_string(), @@ -213,7 +214,7 @@ impl Packet<'_> { } for i in 0..num - 1 { match self.encapsulation[i] { - Encapsulation::L3_IP4(ref l3_header, _) => match self.encapsulation[i + 1] { + Encapsulation::L3_IPV4(ref l3_header, _) => match self.encapsulation[i + 1] { Encapsulation::L4_TCP(ref l4_header, _) => { return Some(( l3_header.source_address.to_string(), @@ -232,7 +233,7 @@ impl Packet<'_> { } _ => continue, }, - Encapsulation::L3_IP6(ref l3_header, _) => match self.encapsulation[i + 1] { + Encapsulation::L3_IPV6(ref l3_header, _) => match self.encapsulation[i + 1] { Encapsulation::L4_TCP(ref l4_header, _) => { return Some(( l3_header.source_address.to_string(), @@ -266,7 +267,7 @@ impl Packet<'_> { for i in (1..num).rev() { match self.encapsulation[i] { Encapsulation::L4_TCP(ref l4_header, _) => match self.encapsulation[i - 1] { - Encapsulation::L3_IP4(ref l3_header, _) => { + Encapsulation::L3_IPV4(ref l3_header, _) => { return Some(( l3_header.source_address.to_string(), l4_header.source_port, @@ -274,7 +275,7 @@ impl Packet<'_> { l4_header.dest_port, )); } - Encapsulation::L3_IP6(ref l3_header, _) => { + Encapsulation::L3_IPV6(ref l3_header, _) => { return Some(( l3_header.source_address.to_string(), l4_header.source_port, @@ -285,7 +286,7 @@ impl Packet<'_> { _ => continue, }, Encapsulation::L4_UDP(ref l4_header, _) => match self.encapsulation[i - 1] { - Encapsulation::L3_IP4(ref l3_header, _) => { + Encapsulation::L3_IPV4(ref l3_header, _) => { return Some(( l3_header.source_address.to_string(), l4_header.source_port, @@ -293,7 +294,7 @@ impl Packet<'_> { l4_header.dest_port, )); } - Encapsulation::L3_IP6(ref l3_header, _) => { + Encapsulation::L3_IPV6(ref l3_header, _) => { return Some(( l3_header.source_address.to_string(), l4_header.source_port, @@ -310,116 +311,55 @@ impl Packet<'_> { return None; } - pub fn get_trace_id(&self) -> Option<String> { - let num = self.encapsulation.len(); - let mut trace_id = String::new(); - for i in 0..num { - match self.encapsulation[i] { - Encapsulation::L3_IP4(ref l3_header, _) => { - trace_id.push_str("IP4->IP4;"); - trace_id.push_str(&l3_header.source_address.to_string()); - trace_id.push_str("->"); - trace_id.push_str(&l3_header.dest_address.to_string()); - trace_id.push_str(";"); - } - Encapsulation::L3_IP6(ref l3_header, _) => { - trace_id.push_str("IP6->IP6;"); - trace_id.push_str(&l3_header.source_address.to_string()); - trace_id.push_str("->"); - trace_id.push_str(&l3_header.dest_address.to_string()); - trace_id.push_str(";"); - } - Encapsulation::L4_TCP(ref l4_header, _) => { - trace_id.push_str("TCP->TCP;"); - trace_id.push_str(&l4_header.source_port.to_string()); - trace_id.push_str("->"); - trace_id.push_str(&l4_header.dest_port.to_string()); - trace_id.push_str(";"); - } - Encapsulation::L4_UDP(ref l4_header, _) => { - trace_id.push_str("UDP->UDP;"); - trace_id.push_str(&l4_header.source_port.to_string()); - trace_id.push_str("->"); - trace_id.push_str(&l4_header.dest_port.to_string()); - trace_id.push_str(";"); - } - _ => continue, - } - } - - Some(trace_id) - } - - pub fn get_reversed_trace_id(&self) -> Option<String> { + pub fn get_flow_id(&self) -> Option<String> { let num = self.encapsulation.len(); - let mut trace_id = String::new(); + let mut flow_id = String::new(); for i in 0..num { match self.encapsulation[i] { - Encapsulation::L3_IP4(ref l3_header, _) => { - trace_id.push_str("IP4->IP4;"); - trace_id.push_str(&l3_header.dest_address.to_string()); - trace_id.push_str("->"); - trace_id.push_str(&l3_header.source_address.to_string()); - trace_id.push_str(";"); + Encapsulation::L3_IPV4(ref l3_header, _) => { + flow_id.push_str(&l3_header.source_address.to_string()); + flow_id.push_str("->"); + flow_id.push_str(&l3_header.dest_address.to_string()); + flow_id.push_str(";"); } - Encapsulation::L3_IP6(ref l3_header, _) => { - trace_id.push_str("IP6->IP6;"); - trace_id.push_str(&l3_header.dest_address.to_string()); - trace_id.push_str("->"); - trace_id.push_str(&l3_header.source_address.to_string()); - trace_id.push_str(";"); + Encapsulation::L3_IPV6(ref l3_header, _) => { + flow_id.push_str(&l3_header.source_address.to_string()); + flow_id.push_str("->"); + flow_id.push_str(&l3_header.dest_address.to_string()); + flow_id.push_str(";"); } Encapsulation::L4_TCP(ref l4_header, _) => { - trace_id.push_str("TCP->TCP;"); - trace_id.push_str(&l4_header.dest_port.to_string()); - trace_id.push_str("->"); - trace_id.push_str(&l4_header.source_port.to_string()); - trace_id.push_str(";"); + flow_id.push_str("TCP->TCP;"); + flow_id.push_str(&l4_header.source_port.to_string()); + flow_id.push_str("->"); + flow_id.push_str(&l4_header.dest_port.to_string()); + flow_id.push_str(";"); } Encapsulation::L4_UDP(ref l4_header, _) => { - trace_id.push_str("UDP->UDP;"); - trace_id.push_str(&l4_header.dest_port.to_string()); - trace_id.push_str("->"); - trace_id.push_str(&l4_header.source_port.to_string()); - trace_id.push_str(";"); + flow_id.push_str("UDP->UDP;"); + flow_id.push_str(&l4_header.source_port.to_string()); + flow_id.push_str("->"); + flow_id.push_str(&l4_header.dest_port.to_string()); + flow_id.push_str(";"); } _ => continue, } } - Some(trace_id) + Some(flow_id) } } -pub fn reverse_trace_id(trace_id: &String) -> String { - let mut reversed_trace_id = String::new(); - let mut trace_id_vec: Vec<&str> = trace_id.split(";").collect(); - trace_id_vec.pop(); - for item in trace_id_vec.iter() { - let mut item_vec: Vec<&str> = item.split("->").collect(); - item_vec.reverse(); - reversed_trace_id.push_str(&item_vec.join("->")); - reversed_trace_id.push_str(";"); - } - reversed_trace_id -} - -fn handle_l2<'a>( - packet: &mut Packet<'a>, - input: &'a [u8], - ctx: Rc<RefCell<ThreadContex>>, -) -> Result<(), PacketError> { - let event_mgr = ctx.borrow().get_event_mgr(); +fn handle_l2<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketError> { + packet.event.push(PacketEvent::L2_EVENT); let result = EthernetFrame::decode(input); if let Ok((payload, header)) = result { dbg!(&header); packet .encapsulation .push(Encapsulation::L2_ETH(header, payload)); - BuildInEvent::trigger_l2_event(event_mgr, None); - return handle_l3(packet, payload, header.ether_type, ctx); + return handle_l3(packet, payload, header.ether_type); } else { - BuildInEvent::trigger_l2_event(event_mgr, None); return Err(PacketError::IncompleteEthernetFrame); } } @@ -428,9 +368,8 @@ fn handle_l3<'a>( packet: &mut Packet<'a>, input: &'a [u8], next_proto: EtherType, - ctx: Rc<RefCell<ThreadContex>>, ) -> Result<(), PacketError> { - let event_mgr = ctx.borrow().get_event_mgr(); + packet.event.push(PacketEvent::L3_EVENT); match next_proto { EtherType::IPv4 => { let result = IPv4Header::decode(input); @@ -443,15 +382,13 @@ fn handle_l3<'a>( packet .encapsulation - .push(Encapsulation::L3_IP4(header, payload)); + .push(Encapsulation::L3_IPV4(header, payload)); // TODO IPv4 Fragment - BuildInEvent::trigger_l3_event(event_mgr.clone(), None); - BuildInEvent::trigger_ip4_event(event_mgr, None); - return handle_l4(packet, payload, header.protocol, ctx); + packet.event.push(PacketEvent::IPV4_EVENT); + return handle_l4(packet, payload, header.protocol); } else { - BuildInEvent::trigger_l3_event(event_mgr, None); return Err(PacketError::IncompleteIpv4Header); } } @@ -466,20 +403,17 @@ fn handle_l3<'a>( packet .encapsulation - .push(Encapsulation::L3_IP6(header, payload)); + .push(Encapsulation::L3_IPV6(header, payload)); // TODO IPv6 Fragment - BuildInEvent::trigger_l3_event(event_mgr.clone(), None); - BuildInEvent::trigger_ip6_event(event_mgr, None); - return handle_l4(packet, payload, header.next_header, ctx); + packet.event.push(PacketEvent::IPV6_EVENT); + return handle_l4(packet, payload, header.next_header); } else { - BuildInEvent::trigger_l3_event(event_mgr, None); return Err(PacketError::IncompleteIpv6Header); } } _e => { - BuildInEvent::trigger_l3_event(event_mgr, None); return Err(PacketError::UnsupportEthernetType); } } @@ -489,10 +423,8 @@ fn handle_l4<'a>( packet: &mut Packet<'a>, input: &'a [u8], next_proto: IPProtocol, - ctx: Rc<RefCell<ThreadContex>>, ) -> Result<(), PacketError> { - let event_mgr = ctx.borrow().get_event_mgr(); - let session_mgr = ctx.borrow().get_session_mgr(); + packet.event.push(PacketEvent::L4_EVENT); match next_proto { IPProtocol::UDP => { let result = UdpHeader::decode(input); @@ -502,23 +434,9 @@ fn handle_l4<'a>( .encapsulation .push(Encapsulation::L4_UDP(header, payload)); - let session = packet2session(&packet, session_mgr); - BuildInEvent::trigger_l4_event(event_mgr.clone(), Some(session.clone())); - session.borrow_mut().set_session_proto(SessionProto::UDP); - match session.borrow().get_session_state() { - SessionState::New => { - BuildInEvent::trigger_udp_opening_event(event_mgr, Some(session.clone())); - } - SessionState::Active => { - BuildInEvent::trigger_udp_active_event(event_mgr, Some(session.clone())); - } - SessionState::Inactive | SessionState::Expired => { - BuildInEvent::trigger_udp_expire_event(event_mgr, Some(session.clone())); - } - } - return handle_l7(packet, payload, session, ctx); + packet.event.push(PacketEvent::UDP_EVENT); + return Ok(()); } else { - BuildInEvent::trigger_l4_event(event_mgr, None); return Err(PacketError::IncompleteUdpHeader); } } @@ -527,119 +445,24 @@ fn handle_l4<'a>( if let Ok((payload, header)) = result { dbg!(&header); - let tcp_need_closed = header.flag_rst || header.flag_fin; packet .encapsulation .push(Encapsulation::L4_TCP(header, payload)); // TODO TCP Reassembly - let session = packet2session(&packet, session_mgr); - BuildInEvent::trigger_l4_event(event_mgr.clone(), Some(session.clone())); - if tcp_need_closed { - session - .borrow_mut() - .set_session_state(SessionState::Inactive); - } - session.borrow_mut().set_session_proto(SessionProto::TCP); - match session.borrow().get_session_state() { - SessionState::New => { - BuildInEvent::trigger_tcp_opening_event(event_mgr, Some(session.clone())); - } - SessionState::Active => { - BuildInEvent::trigger_tcp_active_event(event_mgr, Some(session.clone())); - } - SessionState::Expired => { - BuildInEvent::trigger_tcp_expire_event(event_mgr, Some(session.clone())); - } - SessionState::Inactive => { - BuildInEvent::trigger_tcp_closed_event(event_mgr, Some(session.clone())); - } - } - - return handle_l7(packet, payload, session, ctx); + packet.event.push(PacketEvent::TCP_EVENT); + return Ok(()); } else { - BuildInEvent::trigger_l4_event(event_mgr, None); return Err(PacketError::IncompleteTcpHeader); } } _e => { - BuildInEvent::trigger_l4_event(event_mgr, None); return Err(PacketError::UnsupportIPProtocol); } } } -fn handle_l7<'a>( - packet: &mut Packet<'a>, - input: &'a [u8], - session: Rc<RefCell<Session>>, - ctx: Rc<RefCell<ThreadContex>>, -) -> Result<(), PacketError> { - let event_mgr = ctx.borrow().get_event_mgr(); - let next_proto = l7_identify(packet); - match next_proto { - L7Protocol::DNS => { - let result = DNS_MESSAGE::decode(input); - if let Ok((payload, header)) = result { - dbg!(&header); - packet - .encapsulation - .push(Encapsulation::L7_DNS(header, payload)); - BuildInEvent::trigger_l7_event(event_mgr.clone(), Some(session.clone())); - BuildInEvent::trigger_dns_event(event_mgr, Some(session)); - return Ok(()); - } else { - BuildInEvent::trigger_l7_event(event_mgr, Some(session)); - return Err(PacketError::IncompleteAppHeader); - } - } - L7Protocol::HTTP => { - let result = HTTP_MESSAGE::decode(input); - if let Ok((payload, header)) = result { - dbg!(&header); - packet - .encapsulation - .push(Encapsulation::L7_HTTP(header, payload)); - BuildInEvent::trigger_l7_event(event_mgr.clone(), Some(session.clone())); - BuildInEvent::trigger_http_event(event_mgr, Some(session)); - return Ok(()); - } else { - BuildInEvent::trigger_l7_event(event_mgr, Some(session)); - return Err(PacketError::IncompleteAppHeader); - } - } - L7Protocol::Unsupported => { - BuildInEvent::trigger_l7_event(event_mgr, Some(session)); - return Err(PacketError::UnsupportAppProtocol); - } - } -} - -enum L7Protocol { - DNS, - HTTP, - Unsupported, -} - -fn l7_identify(packet: &Packet) -> L7Protocol { - let option = packet.get_inner_port(); - if option.is_none() { - return L7Protocol::Unsupported; - } - - let (src_port, dst_port) = option.unwrap(); - if src_port == 80 || dst_port == 80 { - return L7Protocol::HTTP; - } - - if src_port == 53 || dst_port == 53 { - return L7Protocol::DNS; - } - - return L7Protocol::Unsupported; -} - /****************************************************************************** * TEST ******************************************************************************/ @@ -648,17 +471,13 @@ fn l7_identify(packet: &Packet) -> L7Protocol { mod tests { use super::Encapsulation; use super::Packet; - use crate::packet::packet::reverse_trace_id; use crate::protocol::ip::IPProtocol; use crate::protocol::ipv4::IPv4Header; use crate::protocol::ipv6::IPv6Header; use crate::protocol::tcp::TcpHeader; use crate::protocol::udp::UdpHeader; - use crate::thread::thread::ThreadContex; - use std::cell::RefCell; use std::net::Ipv4Addr; use std::net::Ipv6Addr; - use std::rc::Rc; #[test] fn test_packet_handle1() { @@ -756,9 +575,8 @@ mod tests { 0x00, 0x01, 0x00, 0x01, /* DNS END */ ]; - let thread_ctx = Rc::new(RefCell::new(ThreadContex::new())); let mut packet = Packet::new(&bytes, bytes.len() as u32); - let result = packet.handle(thread_ctx); + let result = packet.handle(); match result { Ok(v) => { @@ -898,9 +716,8 @@ mod tests { 0x2a, 0x0d, 0x0a, 0x0d, 0x0a, /* HTTP END */ ]; - let thread_ctx = Rc::new(RefCell::new(ThreadContex::new())); let mut packet = Packet::new(&bytes, bytes.len() as u32); - let result = packet.handle(thread_ctx); + let result = packet.handle(); match result { Ok(v) => { @@ -976,13 +793,13 @@ mod tests { packet .encapsulation - .push(Encapsulation::L3_IP4(ipv4_hdr.clone(), b"1")); + .push(Encapsulation::L3_IPV4(ipv4_hdr.clone(), b"1")); packet .encapsulation .push(Encapsulation::L4_TCP(tcp_hdr.clone(), b"2")); packet .encapsulation - .push(Encapsulation::L3_IP6(ipv6_hdr.clone(), b"3")); + .push(Encapsulation::L3_IPV6(ipv6_hdr.clone(), b"3")); packet .encapsulation .push(Encapsulation::L4_UDP(udp_hdr.clone(), b"4")); @@ -1023,11 +840,11 @@ mod tests { assert_eq!( packet.get_outer_l3_layer(), - Some(Encapsulation::L3_IP4(ipv4_hdr, b"1")) + Some(Encapsulation::L3_IPV4(ipv4_hdr, b"1")) ); assert_eq!( packet.get_inner_l3_layer(), - Some(Encapsulation::L3_IP6(ipv6_hdr, b"3")) + Some(Encapsulation::L3_IPV6(ipv6_hdr, b"3")) ); assert_eq!( packet.get_outer_l4_layer(), @@ -1038,8 +855,6 @@ mod tests { Some(Encapsulation::L4_UDP(udp_hdr, b"4")) ); - assert_eq!(packet.get_trace_id(), Some("IP4->IP4;192.168.0.101->121.14.154.93;TCP->TCP;50081->443;IP6->IP6;2409:8034:4025::50:a31->2409:8034:4040:5301::204;UDP->UDP;9993->9994;".to_string())); - assert_eq!(packet.get_reversed_trace_id() ,Some("IP4->IP4;121.14.154.93->192.168.0.101;TCP->TCP;443->50081;IP6->IP6;2409:8034:4040:5301::204->2409:8034:4025::50:a31;UDP->UDP;9994->9993;".to_string())); - assert_eq!(reverse_trace_id(&packet.get_trace_id().unwrap()), "IP4->IP4;121.14.154.93->192.168.0.101;TCP->TCP;443->50081;IP6->IP6;2409:8034:4040:5301::204->2409:8034:4025::50:a31;UDP->UDP;9994->9993;".to_string()); + assert_eq!(packet.get_flow_id(), Some("192.168.0.101->121.14.154.93;TCP->TCP;50081->443;2409:8034:4025::50:a31->2409:8034:4040:5301::204;UDP->UDP;9993->9994;".to_string())); } } diff --git a/src/plugin/example.rs b/src/plugin/example.rs index eb38c2c..a3ca852 100644 --- a/src/plugin/example.rs +++ b/src/plugin/example.rs @@ -44,22 +44,22 @@ impl EventHandle for ExamplePulgin { self.tcp_opening_event = event_mgr .borrow_mut() - .event2index(BUILDIN_TCP_OPENING_EVENT); + .event2index(BUILTIN_TCP_OPENING_EVENT); event_mgr .borrow_mut() .register(self.tcp_opening_event, Box::new(self.clone())); - self.tcp_active_event = event_mgr.borrow_mut().event2index(BUILDIN_TCP_ACTIVE_EVENT); + self.tcp_active_event = event_mgr.borrow_mut().event2index(BUILTIN_TCP_ACTIVE_EVENT); event_mgr .borrow_mut() .register(self.tcp_active_event, Box::new(self.clone())); - self.tcp_expire_event = event_mgr.borrow_mut().event2index(BUILDIN_TCP_EXPIRE_EVENT); + self.tcp_expire_event = event_mgr.borrow_mut().event2index(BUILTIN_TCP_EXPIRE_EVENT); event_mgr .borrow_mut() .register(self.tcp_expire_event, Box::new(self.clone())); - self.tcp_closed_event = event_mgr.borrow_mut().event2index(BUILDIN_TCP_CLOSED_EVENT); + self.tcp_closed_event = event_mgr.borrow_mut().event2index(BUILTIN_TCP_CLOSED_EVENT); event_mgr .borrow_mut() .register(self.tcp_closed_event, Box::new(self.clone())); @@ -72,7 +72,12 @@ impl EventHandle for ExamplePulgin { .register(self.http_opening_event, Box::new(self.clone())); } - fn handle(&mut self, index: usize, packet: &Packet, session: Option<Rc<RefCell<Session>>>) { + fn handle( + &mut self, + index: usize, + packet: Option<&Packet>, + session: Option<Rc<RefCell<Session>>>, + ) { if session.is_none() { return; } @@ -83,22 +88,22 @@ impl EventHandle for ExamplePulgin { let session = session.unwrap(); if index == self.tcp_opening_event { println!( - "{} handle BUILDIN_TCP_OPENING_EVENT: {:?}", + "{} handle BUILTIN_TCP_OPENING_EVENT: {:?}", self.plugin_name, session ); } else if index == self.tcp_active_event { println!( - "{} handle BUILDIN_TCP_ACTIVE_EVENT: {:?}", + "{} handle BUILTIN_TCP_ACTIVE_EVENT: {:?}", self.plugin_name, session ); } else if index == self.tcp_expire_event { println!( - "{} handle BUILDIN_TCP_EXPIRE_EVENT: {:?}", + "{} handle BUILTIN_TCP_EXPIRE_EVENT: {:?}", self.plugin_name, session ); } else if index == self.tcp_closed_event { println!( - "{} handle BUILDIN_TCP_CLOSED_EVENT: {:?}", + "{} handle BUILTIN_TCP_CLOSED_EVENT: {:?}", self.plugin_name, session ); } else if index == self.http_opening_event { diff --git a/src/session/manager.rs b/src/session/manager.rs index 60222d6..462949f 100644 --- a/src/session/manager.rs +++ b/src/session/manager.rs @@ -1,7 +1,7 @@ -use crate::session::session::reverse_session_id; use crate::session::session::Session; -use crate::session::session::SessionProto; +use crate::session::session::SessionDirection; use crate::session::session::SessionState; +use crate::utils::utils::reverse_flow_id; use chrono::Utc; use std::cell::RefCell; use std::collections::HashMap; @@ -27,13 +27,47 @@ impl SessionManager { } } + pub fn update(&mut self, session_id: String) -> Rc<RefCell<Session>> { + let result = self.get_session(&session_id); + if result.is_none() { + // New Session + let mut new_session = Session::new(session_id.clone()); + new_session.update_session_current_dir(SessionDirection::C2S); + + let rc_new_session = Rc::new(RefCell::new(new_session)); + self.insert_session(session_id.clone(), rc_new_session.clone()); + rc_new_session + } else { + // Update Session + let rc_old_session = result.unwrap(); + rc_old_session + .borrow_mut() + .set_session_state(SessionState::Active); + rc_old_session.borrow_mut().update_session_last_ts(); + rc_old_session.borrow_mut().update_session_expire_ts(); + + // Update Session Direction + if rc_old_session.borrow().get_session_id() == session_id { + rc_old_session + .borrow_mut() + .update_session_current_dir(SessionDirection::C2S); + } else { + rc_old_session + .borrow_mut() + .update_session_current_dir(SessionDirection::S2C); + } + + rc_old_session + } + } + pub fn get_session(&self, session_id: &String) -> Option<Rc<RefCell<Session>>> { let result = self.sessions.get(session_id).map(|session| session.clone()); if result.is_some() { return result; } - let reversed_id = reverse_session_id(session_id); + let reversed_id = reverse_flow_id(session_id); self.sessions .get(reversed_id.as_str()) .map(|session| session.clone()) @@ -49,39 +83,38 @@ impl SessionManager { return result; } - let reversed_id = reverse_session_id(&session_id.to_string()); + let reversed_id = reverse_flow_id(&session_id.to_string()); self.sessions.remove(reversed_id.as_str()) } - pub fn expire_sessions(&mut self) { - let now = Utc::now().timestamp(); - let mut expired_sessions = Vec::with_capacity(1024); - for (session_id, session) in &self.sessions { - if session.borrow().get_session_expire_ts() < now { - expired_sessions.push(session_id.clone()); + pub fn expire_oldest_session(&mut self) -> Option<Rc<RefCell<Session>>> { + let mut oldest_session: Option<Rc<RefCell<Session>>> = None; + let mut oldest_session_expire_ts = Utc::now().timestamp(); + + for (_, session) in &self.sessions { + let session_expire_ts = session.borrow().get_session_expire_ts(); + if session_expire_ts < oldest_session_expire_ts { + oldest_session = Some(session.clone()); + oldest_session_expire_ts = session_expire_ts; } } - for session_id in expired_sessions { - let option = self.remove_session(&session_id); - if let Some(session) = option { - println!("Session expired: {}", session_id); - session - .borrow_mut() - .set_session_state(SessionState::Expired); - session - .borrow_mut() - .set_session_end_ts(Utc::now().timestamp()); - - match session.borrow().get_session_proto() { - SessionProto::TCP => { - //BuildInEvent::trigger_tcp_expire_event(event_mgr, Some(session)); - } - SessionProto::UDP => { - //BuildInEvent::trigger_udp_expire_event(event_mgr, Some(session)); - } - } - } + + if oldest_session.is_some() { + let oldest_session_id = oldest_session.clone().unwrap().borrow().get_session_id(); + oldest_session + .clone() + .unwrap() + .borrow_mut() + .set_session_state(SessionState::Expired); + oldest_session + .clone() + .unwrap() + .borrow_mut() + .set_session_end_ts(oldest_session_expire_ts); + self.remove_session(&oldest_session_id); } + + oldest_session } } @@ -96,12 +129,10 @@ mod tests { use std::cell::RefCell; use std::rc::Rc; - // use std::{thread, time}; - #[test] fn test_session_manager() { - let session_id = "IP4->IP4;192.168.0.1->192.168.0.2;UDP->UDP;2345->80;".to_string(); - let reversed_id = "IP4->IP4;192.168.0.2->192.168.0.1;UDP->UDP;80->2345;".to_string(); + let session_id = "192.168.0.1->192.168.0.2;UDP->UDP;2345->80;".to_string(); + let reversed_id = "192.168.0.2->192.168.0.1;UDP->UDP;80->2345;".to_string(); let session = Rc::new(RefCell::new(Session::new(session_id.clone()))); // Create Session Manager @@ -117,10 +148,6 @@ mod tests { assert_eq!(session_mgr.get_session(&session_id).is_some(), true); assert_eq!(session_mgr.get_session(&reversed_id).is_some(), true); - // Expire Session - // thread::sleep(time::Duration::from_secs(61)); - // session_mgr.expire_sessions(); - // Delete session assert_eq!(session_mgr.remove_session(&reversed_id).is_some(), true); diff --git a/src/session/session.rs b/src/session/session.rs index adbdb64..e8b2be4 100644 --- a/src/session/session.rs +++ b/src/session/session.rs @@ -1,10 +1,5 @@ -use crate::packet::packet::reverse_trace_id; -use crate::packet::packet::Packet; -use crate::session::manager::SessionManager; use chrono::Utc; -use std::cell::RefCell; use std::collections::HashMap; -use std::rc::Rc; /****************************************************************************** * Struct @@ -200,59 +195,6 @@ impl Session { } /****************************************************************************** - * Utils API - ******************************************************************************/ - -pub fn reverse_session_id(session_id: &String) -> String { - reverse_trace_id(session_id) -} - -pub fn packet2session( - packet: &Packet, - session_mgr: Rc<RefCell<SessionManager>>, -) -> Rc<RefCell<Session>> { - let rc_session; - let packet_len = packet.orig_len as u64; - - let session_id = packet.get_trace_id().unwrap(); - let option = session_mgr.borrow().get_session(&session_id); - if option.is_none() { - let mut session = Session::new(session_id.clone()); - session.inc_session_c2s_metrics(0, 0, 1, packet_len); - session.update_session_current_dir(SessionDirection::C2S); - rc_session = Rc::new(RefCell::new(session)); - session_mgr - .borrow_mut() - .insert_session(session_id, rc_session.clone()); - } else { - rc_session = option.unwrap(); - rc_session - .borrow_mut() - .set_session_state(SessionState::Active); - - if rc_session.borrow().get_session_id() == session_id { - rc_session - .borrow_mut() - .inc_session_c2s_metrics(0, 0, 1, packet_len); - rc_session - .borrow_mut() - .update_session_current_dir(SessionDirection::C2S); - } else { - rc_session - .borrow_mut() - .inc_session_s2c_metrics(0, 0, 1, packet_len); - rc_session - .borrow_mut() - .update_session_current_dir(SessionDirection::S2C); - } - rc_session.borrow_mut().update_session_last_ts(); - rc_session.borrow_mut().update_session_expire_ts(); - } - - rc_session -} - -/****************************************************************************** * TEST ******************************************************************************/ diff --git a/src/utils/mod.rs b/src/utils/mod.rs new file mode 100644 index 0000000..b5614dd --- /dev/null +++ b/src/utils/mod.rs @@ -0,0 +1 @@ +pub mod utils; diff --git a/src/utils/utils.rs b/src/utils/utils.rs new file mode 100644 index 0000000..e784b34 --- /dev/null +++ b/src/utils/utils.rs @@ -0,0 +1,27 @@ +pub fn reverse_flow_id(flow_id: &String) -> String { + let mut reversed_flow_id = String::new(); + let mut flow_id_vec: Vec<&str> = flow_id.split(";").collect(); + flow_id_vec.pop(); + for item in flow_id_vec.iter() { + let mut item_vec: Vec<&str> = item.split("->").collect(); + item_vec.reverse(); + reversed_flow_id.push_str(&item_vec.join("->")); + reversed_flow_id.push_str(";"); + } + reversed_flow_id +} + +/****************************************************************************** + * TEST + ******************************************************************************/ + +#[cfg(test)] +mod tests { + use super::reverse_flow_id; + + #[test] + fn test_reverse_flow_id() { + let flow_id = "192.168.0.101->121.14.154.93;TCP->TCP;50081->443;2409:8034:4025::50:a31->2409:8034:4040:5301::204;UDP->UDP;9993->9994;".to_string(); + assert_eq!(reverse_flow_id(&flow_id), "121.14.154.93->192.168.0.101;TCP->TCP;443->50081;2409:8034:4040:5301::204->2409:8034:4025::50:a31;UDP->UDP;9994->9993;".to_string()); + } +} |
