diff options
| author | luwenpeng <[email protected]> | 2023-08-29 18:43:04 +0800 |
|---|---|---|
| committer | luwenpeng <[email protected]> | 2023-08-31 18:42:25 +0800 |
| commit | ccf658df8cd062fedb66bb64d3c5a25b2d847f25 (patch) | |
| tree | 182d74d33ba8ad914be2772620f073e62262ae55 | |
| parent | ab66a023549901674110176256df5564ae039ce6 (diff) | |
[feature] Integration Test Passed
1.Receive packets through pcap
2.Packet decapsulate
3.Trigger events
4.Dispatch event management
5.Call plugin handle
6.Plugin manipulate session
| -rw-r--r-- | src/event/event.rs | 232 | ||||
| -rw-r--r-- | src/event/manager.rs | 287 | ||||
| -rw-r--r-- | src/event/mod.rs | 1 | ||||
| -rw-r--r-- | src/lib.rs | 4 | ||||
| -rw-r--r-- | src/main.rs | 87 | ||||
| -rw-r--r-- | src/packet/capture.rs | 14 | ||||
| -rw-r--r-- | src/packet/error.rs | 2 | ||||
| -rw-r--r-- | src/packet/packet.rs | 210 | ||||
| -rw-r--r-- | src/plugin/example.rs | 50 | ||||
| -rw-r--r-- | src/plugin/mod.rs | 1 | ||||
| -rw-r--r-- | src/session/manager.rs | 98 | ||||
| -rw-r--r-- | src/session/session.rs | 212 | ||||
| -rw-r--r-- | src/session/tuple.rs | 80 | ||||
| -rw-r--r-- | src/thread/mod.rs | 1 | ||||
| -rw-r--r-- | src/thread/thread.rs | 26 |
15 files changed, 987 insertions, 318 deletions
diff --git a/src/event/event.rs b/src/event/event.rs index bef0efe..8ab3918 100644 --- a/src/event/event.rs +++ b/src/event/event.rs @@ -1,132 +1,140 @@ -use crate::packet::packet::Packet; +use crate::event::manager::EventManager; +use crate::session::session::Session; use std::cell::RefCell; -use std::collections::HashMap; -use std::collections::VecDeque; +use std::rc::Rc; + +// L2 Event +pub const BUILDIN_L2_EVENT: &str = "BUILDIN_L2_EVENT"; + +// L3 Event +pub const BUILDIN_L3_EVENT: &str = "BUILDIN_L3_EVENT"; +pub const BUILDIN_IP4_EVENT: &str = "BUILDIN_IP4_EVENT"; +pub const BUILDIN_IP6_EVENT: &str = "BUILDIN_IP6_EVENT"; + +// L4 Event +pub const BUILDIN_L4_EVENT: &str = "BUILDIN_L4_EVENT"; +pub const BUILDIN_TCP_OPENING_EVENT: &str = "BUILDIN_TCP_OPENING_EVENT"; +pub const BUILDIN_TCP_ACTIVE_EVENT: &str = "BUILDIN_TCP_ACTIVE_EVENT"; +pub const BUILDIN_TCP_EXPIRE_EVENT: &str = "BUILDIN_TCP_EXPIRE_EVENT"; +pub const BUILDIN_TCP_CLOSED_EVENT: &str = "BUILDIN_TCP_CLOSED_EVENT"; + +pub const BUILDIN_UDP_OPENING_EVENT: &str = "BUILDIN_UDP_OPENING_EVENT"; +pub const BUILDIN_UDP_ACTIVE_EVENT: &str = "BUILDIN_UDP_ACTIVE_EVENT"; +pub const BUILDIN_UDP_EXPIRE_EVENT: &str = "BUILDIN_UDP_EXPIRE_EVENT"; + +// L7 Event +pub const BUILDIN_L7_EVENT: &str = "BUILDIN_L7_EVENT"; +pub const BUILDIN_DNS_EVENT: &str = "BUILDIN_DNS_EVENT"; +pub const BUILDIN_HTTP_EVENT: &str = "BUILDIN_HTTP_EVENT"; + +pub struct BuildInEvent {} + +impl BuildInEvent { + pub fn trigger_l2_event(mgr: Rc<RefCell<EventManager>>, session: Option<Rc<RefCell<Session>>>) { + let buildin_l2_event = mgr.borrow_mut().event2index(BUILDIN_L2_EVENT); + mgr.borrow_mut().trigger(buildin_l2_event, session); + } -trait EventCallback { - fn handle(&mut self, index: usize, packet: &Packet); -} + pub fn trigger_l3_event(mgr: Rc<RefCell<EventManager>>, session: Option<Rc<RefCell<Session>>>) { + let buildin_l3_event = mgr.borrow_mut().event2index(BUILDIN_L3_EVENT); + mgr.borrow_mut().trigger(buildin_l3_event, session); + } -struct EventManager { - event_map: HashMap<String, usize>, // event name -> event index - ready_event: VecDeque<usize>, // ready event index - cared_event: HashMap<usize, RefCell<Vec<Box<dyn EventCallback>>>>, // event index -> event callback -} + pub fn trigger_ip4_event( + mgr: Rc<RefCell<EventManager>>, + session: Option<Rc<RefCell<Session>>>, + ) { + let buildin_ip4_event = mgr.borrow_mut().event2index(BUILDIN_IP4_EVENT); + mgr.borrow_mut().trigger(buildin_ip4_event, session); + } -impl EventManager { - pub fn new() -> EventManager { - EventManager { - event_map: HashMap::new(), - ready_event: VecDeque::new(), - cared_event: HashMap::new(), - } + pub fn trigger_ip6_event( + mgr: Rc<RefCell<EventManager>>, + session: Option<Rc<RefCell<Session>>>, + ) { + let buildin_ip6_event = mgr.borrow_mut().event2index(BUILDIN_IP6_EVENT); + mgr.borrow_mut().trigger(buildin_ip6_event, session); } - pub fn index(&mut self, event: &String) -> usize { - if let Some(index) = self.event_map.get(event) { - *index - } else { - let index = self.event_map.len(); - self.event_map.insert(event.to_string(), index); - index - } + pub fn trigger_l4_event(mgr: Rc<RefCell<EventManager>>, session: Option<Rc<RefCell<Session>>>) { + let buildin_l4_event = mgr.borrow_mut().event2index(BUILDIN_L4_EVENT); + mgr.borrow_mut().trigger(buildin_l4_event, session); } - pub fn registe(&mut self, index: usize, handle: Box<dyn EventCallback>) { - if let Some(vec) = self.cared_event.get(&index) { - let mut vec = vec.borrow_mut(); - vec.push(handle); - } else { - let mut vec = Vec::with_capacity(1024); - vec.push(handle); - self.cared_event.insert(index, RefCell::new(vec)); - } + pub fn trigger_tcp_opening_event( + mgr: Rc<RefCell<EventManager>>, + session: Option<Rc<RefCell<Session>>>, + ) { + let buildin_tcp_opening_event = mgr.borrow_mut().event2index(BUILDIN_TCP_OPENING_EVENT); + mgr.borrow_mut().trigger(buildin_tcp_opening_event, session); } - pub fn trigger(&mut self, index: usize) { - self.ready_event.push_back(index); + pub fn trigger_tcp_active_event( + mgr: Rc<RefCell<EventManager>>, + session: Option<Rc<RefCell<Session>>>, + ) { + let buildin_tcp_active_event = mgr.borrow_mut().event2index(BUILDIN_TCP_ACTIVE_EVENT); + mgr.borrow_mut().trigger(buildin_tcp_active_event, session); } - pub fn dispatch(&mut self, packet: &Packet) { - loop { - if let Some(index) = self.ready_event.pop_front() { - println!("Dispatch event {:?}", index); - if let Some(vec) = self.cared_event.get(&index) { - let mut vec = vec.borrow_mut(); - for callback in vec.iter_mut() { - callback.handle(index, packet); - } - } - } else { - break; - } - } + pub fn trigger_tcp_expire_event( + mgr: Rc<RefCell<EventManager>>, + session: Option<Rc<RefCell<Session>>>, + ) { + let buildin_tcp_expire_event = mgr.borrow_mut().event2index(BUILDIN_TCP_EXPIRE_EVENT); + mgr.borrow_mut().trigger(buildin_tcp_expire_event, session); + } + + pub fn trigger_tcp_closed_event( + mgr: Rc<RefCell<EventManager>>, + session: Option<Rc<RefCell<Session>>>, + ) { + let buildin_tcp_closed_event = mgr.borrow_mut().event2index(BUILDIN_TCP_CLOSED_EVENT); + mgr.borrow_mut().trigger(buildin_tcp_closed_event, session); + } + + pub fn trigger_udp_opening_event( + mgr: Rc<RefCell<EventManager>>, + session: Option<Rc<RefCell<Session>>>, + ) { + let buildin_udp_opening_event = mgr.borrow_mut().event2index(BUILDIN_UDP_OPENING_EVENT); + mgr.borrow_mut().trigger(buildin_udp_opening_event, session); } -} -/****************************************************************************** - * TEST - ******************************************************************************/ - -#[cfg(test)] -mod tests { - use super::EventCallback; - use super::EventManager; - use crate::packet::packet::Packet; - use std::cell::RefCell; - use std::rc::Rc; - - #[derive(Debug, Clone)] - struct Pulgin { - plugin_id: u32, - plugin_ctx: Rc<RefCell<String>>, + pub fn trigger_udp_active_event( + mgr: Rc<RefCell<EventManager>>, + session: Option<Rc<RefCell<Session>>>, + ) { + let buildin_udp_active_event = mgr.borrow_mut().event2index(BUILDIN_UDP_ACTIVE_EVENT); + mgr.borrow_mut().trigger(buildin_udp_active_event, session); } - impl Pulgin { - pub fn new(plugin_id: u32, plugin_ctx: Rc<RefCell<String>>) -> Pulgin { - Pulgin { - plugin_id, - plugin_ctx, - } - } + + pub fn trigger_udp_expire_event( + mgr: Rc<RefCell<EventManager>>, + session: Option<Rc<RefCell<Session>>>, + ) { + let buildin_udp_expire_event = mgr.borrow_mut().event2index(BUILDIN_UDP_EXPIRE_EVENT); + mgr.borrow_mut().trigger(buildin_udp_expire_event, session); } - impl EventCallback for Pulgin { - fn handle(&mut self, index: usize, packet: &Packet) { - self.plugin_ctx.borrow_mut().push_str(" World"); - println!("Pulgin handle event {:?}, {:?}, {:?}", index, self, packet); - } + + pub fn trigger_l7_event(mgr: Rc<RefCell<EventManager>>, session: Option<Rc<RefCell<Session>>>) { + let buildin_l7_event = mgr.borrow_mut().event2index(BUILDIN_L7_EVENT); + mgr.borrow_mut().trigger(buildin_l7_event, session); + } + + pub fn trigger_dns_event( + mgr: Rc<RefCell<EventManager>>, + session: Option<Rc<RefCell<Session>>>, + ) { + let buildin_dns_event = mgr.borrow_mut().event2index(BUILDIN_DNS_EVENT); + mgr.borrow_mut().trigger(buildin_dns_event, session); } - #[test] - fn test_event_manager() { - // create plugin - let plugin_cb = Pulgin::new(1, Rc::new(RefCell::new(String::from("Hello")))); - - // create event manager - let mut mgr: EventManager = EventManager::new(); - - // get event index - let tcp_opening_event = mgr.index(&String::from("TCP_OPENING")); - let tcp_active_event = mgr.index(&String::from("TCP_ACTIVE")); - let tcp_closing_event = mgr.index(&String::from("TCP_CLOSING")); - let tcp_closed_event = mgr.index(&String::from("TCP_CLOSED")); - - // registe event - mgr.registe(tcp_opening_event, Box::new(plugin_cb.clone())); - mgr.registe(tcp_active_event, Box::new(plugin_cb.clone())); - mgr.registe(tcp_closing_event, Box::new(plugin_cb.clone())); - mgr.registe(tcp_closed_event, Box::new(plugin_cb.clone())); - - // trigger event - mgr.trigger(tcp_opening_event); - mgr.trigger(tcp_active_event); - mgr.trigger(tcp_closing_event); - mgr.trigger(tcp_closed_event); - - // dispatch event - let bytes = [0x48, 0x73]; - let packet = Packet::new(&bytes, bytes.len() as u32); - mgr.dispatch(&packet); - - // assert_eq!(1, 0); + pub fn trigger_http_event( + mgr: Rc<RefCell<EventManager>>, + session: Option<Rc<RefCell<Session>>>, + ) { + let buildin_http_event = mgr.borrow_mut().event2index(BUILDIN_HTTP_EVENT); + mgr.borrow_mut().trigger(buildin_http_event, session); } } diff --git a/src/event/manager.rs b/src/event/manager.rs new file mode 100644 index 0000000..587a00b --- /dev/null +++ b/src/event/manager.rs @@ -0,0 +1,287 @@ +use crate::packet::packet::Packet; +use crate::session::session::Session; +use std::cell::RefCell; +use std::collections::HashMap; +use std::collections::VecDeque; +use std::rc::Rc; + +pub trait EventHandle { + fn handle(&mut self, index: usize, packet: &Packet, session: Option<Rc<RefCell<Session>>>); +} + +pub struct EventManager { + index2event: HashMap<usize, &'static str>, // event index -> event name + event2index: HashMap<&'static str, usize>, // event name -> event index + ready_event: VecDeque<(usize, Option<Rc<RefCell<Session>>>)>, // ready event index + cared_event: HashMap<usize, RefCell<Vec<Box<dyn EventHandle>>>>, // event index -> event handle +} + +impl EventManager { + pub fn new() -> EventManager { + EventManager { + index2event: HashMap::new(), + event2index: HashMap::new(), + ready_event: VecDeque::new(), + cared_event: HashMap::new(), + } + } + + pub fn event2index(&mut self, event: &'static str) -> usize { + if let Some(index) = self.event2index.get(event) { + *index + } else { + let index = self.event2index.len(); + self.event2index.insert(event, index); + self.index2event.insert(index, event); + index + } + } + + pub fn index2event(&self, index: usize) -> Option<&'static str> { + if let Some(event) = self.index2event.get(&index) { + Some(*event) + } else { + None + } + } + + pub fn register(&mut self, index: usize, handle: Box<dyn EventHandle>) { + if let Some(vec) = self.cared_event.get(&index) { + let mut vec = vec.borrow_mut(); + vec.push(handle); + } else { + let mut vec = Vec::with_capacity(1024); + vec.push(handle); + self.cared_event.insert(index, RefCell::new(vec)); + } + } + + pub fn trigger(&mut self, index: usize, session: Option<Rc<RefCell<Session>>>) { + self.ready_event.push_back((index, session)); + } + + pub fn dispatch(&mut self, packet: &Packet) { + loop { + if let Some(event) = self.ready_event.pop_front() { + println!("Dispatch event: {:?}", self.index2event.get(&event.0)); + if let Some(vec) = self.cared_event.get(&event.0) { + let mut vec = vec.borrow_mut(); + for callback in vec.iter_mut() { + callback.handle(event.0, packet, event.1.clone()); + } + } + } else { + break; + } + } + } +} + +/****************************************************************************** + * TEST + ******************************************************************************/ + +#[cfg(test)] +mod tests { + use crate::packet::packet::Packet; + use crate::plugin::example::ExamplePulgin; + use crate::thread::thread::ThreadContex; + use std::cell::RefCell; + use std::rc::Rc; + + /* + * Frame 217: 131 bytes on wire (1048 bits), 131 bytes captured (1048 bits) on interface en0, id 0 + * Section number: 1 + * Interface id: 0 (en0) + * Interface name: en0 + * Interface description: Wi-Fi + * Encapsulation type: Ethernet (1) + * Arrival Time: Aug 2, 2023 11:49:21.582237000 CST + * [Time shift for this packet: 0.000000000 seconds] + * Epoch Time: 1690948161.582237000 seconds + * [Time delta from previous captured frame: 0.000042000 seconds] + * [Time delta from previous displayed frame: 0.000000000 seconds] + * [Time since reference or first frame: 4.905717000 seconds] + * Frame Number: 217 + * Frame Length: 131 bytes (1048 bits) + * Capture Length: 131 bytes (1048 bits) + * [Frame is marked: False] + * [Frame is ignored: False] + * [Protocols in frame: eth:ethertype:ip:tcp:http] + * [Coloring Rule Name: HTTP] + * [Coloring Rule String: http || tcp.port == 80 || http2] + * Ethernet II, Src: Apple_0a:c5:ea (3c:a6:f6:0a:c5:ea), Dst: NewH3CTe_96:38:0e (48:73:97:96:38:0e) + * Destination: NewH3CTe_96:38:0e (48:73:97:96:38:0e) + * Address: NewH3CTe_96:38:0e (48:73:97:96:38:0e) + * .... ..0. .... .... .... .... = LG bit: Globally unique address (factory default) + * .... ...0 .... .... .... .... = IG bit: Individual address (unicast) + * Source: Apple_0a:c5:ea (3c:a6:f6:0a:c5:ea) + * Address: Apple_0a:c5:ea (3c:a6:f6:0a:c5:ea) + * .... ..0. .... .... .... .... = LG bit: Globally unique address (factory default) + * .... ...0 .... .... .... .... = IG bit: Individual address (unicast) + * Type: IPv4 (0x0800) + * Internet Protocol Version 4, Src: 192.168.38.63, Dst: 182.61.200.6 + * 0100 .... = Version: 4 + * .... 0101 = Header Length: 20 bytes (5) + * Differentiated Services Field: 0x00 (DSCP: CS0, ECN: Not-ECT) + * 0000 00.. = Differentiated Services Codepoint: Default (0) + * .... ..00 = Explicit Congestion Notification: Not ECN-Capable Transport (0) + * Total Length: 117 + * Identification: 0x0000 (0) + * 010. .... = Flags: 0x2, Don't fragment + * 0... .... = Reserved bit: Not set + * .1.. .... = Don't fragment: Set + * ..0. .... = More fragments: Not set + * ...0 0000 0000 0000 = Fragment Offset: 0 + * Time to Live: 64 + * Protocol: TCP (6) + * Header Checksum: 0xd557 [correct] + * [Header checksum status: Good] + * [Calculated Checksum: 0xd557] + * Source Address: 192.168.38.63 + * Destination Address: 182.61.200.6 + * Transmission Control Protocol, Src Port: 57016, Dst Port: 80, Seq: 1, Ack: 1, Len: 77 + * Source Port: 57016 + * Destination Port: 80 + * [Stream index: 9] + * [Conversation completeness: Complete, WITH_DATA (31)] + * [TCP Segment Len: 77] + * Sequence Number: 1 (relative sequence number) + * Sequence Number (raw): 1965697618 + * [Next Sequence Number: 78 (relative sequence number)] + * Acknowledgment Number: 1 (relative ack number) + * Acknowledgment number (raw): 4259318185 + * 0101 .... = Header Length: 20 bytes (5) + * Flags: 0x018 (PSH, ACK) + * 000. .... .... = Reserved: Not set + * ...0 .... .... = Accurate ECN: Not set + * .... 0... .... = Congestion Window Reduced: Not set + * .... .0.. .... = ECN-Echo: Not set + * .... ..0. .... = Urgent: Not set + * .... ...1 .... = Acknowledgment: Set + * .... .... 1... = Push: Set + * .... .... .0.. = Reset: Not set + * .... .... ..0. = Syn: Not set + * .... .... ...0 = Fin: Not set + * [TCP Flags: ·······AP···] + * Window: 4096 + * [Calculated window size: 262144] + * [Window size scaling factor: 64] + * Checksum: 0x7f51 [correct] + * [Checksum Status: Good] + * [Calculated Checksum: 0x7f51] + * Urgent Pointer: 0 + * [Timestamps] + * [Time since first frame in this TCP stream: 0.010626000 seconds] + * [Time since previous frame in this TCP stream: 0.000042000 seconds] + * [SEQ/ACK analysis] + * [iRTT: 0.010584000 seconds] + * [Bytes in flight: 77] + * [Bytes sent since last PSH flag: 77] + * TCP payload (77 bytes) + * Hypertext Transfer Protocol + * GET / HTTP/1.1\r\n + * [Expert Info (Chat/Sequence): GET / HTTP/1.1\r\n] + * [GET / HTTP/1.1\r\n] + * [Severity level: Chat] + * [Group: Sequence] + * Request Method: GET + * Request URI: / + * Request Version: HTTP/1.1 + * Host: www.baidu.com\r\n + * User-Agent: curl/7.64.1\r\n + */ + // Accept: */*\r\n + /* \r\n + * [Full request URI: http://www.baidu.com/] + * [HTTP request 1/1] + * [Response in frame: 220] + */ + + #[test] + fn test_event_manager() { + let bytes = [ + 0x48, 0x73, 0x97, 0x96, 0x38, 0x0e, 0x3c, 0xa6, 0xf6, 0x0a, 0xc5, 0xea, 0x08, 0x00, + 0x45, 0x00, 0x00, 0x75, 0x00, 0x00, 0x40, 0x00, 0x40, 0x06, 0xd5, 0x57, 0xc0, 0xa8, + 0x26, 0x3f, 0xb6, 0x3d, 0xc8, 0x06, 0xde, 0xb8, 0x00, 0x50, 0x75, 0x2a, 0x2a, 0x52, + 0xfd, 0xe0, 0x09, 0xa9, 0x50, 0x18, 0x10, 0x00, 0x7f, 0x51, 0x00, 0x00, 0x47, 0x45, + 0x54, 0x20, 0x2f, 0x20, 0x48, 0x54, 0x54, 0x50, 0x2f, 0x31, 0x2e, 0x31, 0x0d, 0x0a, + 0x48, 0x6f, 0x73, 0x74, 0x3a, 0x20, 0x77, 0x77, 0x77, 0x2e, 0x62, 0x61, 0x69, 0x64, + 0x75, 0x2e, 0x63, 0x6f, 0x6d, 0x0d, 0x0a, 0x55, 0x73, 0x65, 0x72, 0x2d, 0x41, 0x67, + 0x65, 0x6e, 0x74, 0x3a, 0x20, 0x63, 0x75, 0x72, 0x6c, 0x2f, 0x37, 0x2e, 0x36, 0x34, + 0x2e, 0x31, 0x0d, 0x0a, 0x41, 0x63, 0x63, 0x65, 0x70, 0x74, 0x3a, 0x20, 0x2a, 0x2f, + 0x2a, 0x0d, 0x0a, 0x0d, 0x0a, /* HTTP END */ + ]; + + // Create plugin + let plugin = ExamplePulgin::new( + "Example Plugin", + Rc::new(RefCell::new(String::from("Hello"))), + ); + + // Create event manager + let thread_ctx = Rc::new(RefCell::new(ThreadContex::new())); + let mgr = thread_ctx.borrow_mut().get_event_mgr(); + + // Register event + let l2_event = mgr.borrow_mut().event2index("BUILDIN_L2_EVENT"); + let l3_event = mgr.borrow_mut().event2index("BUILDIN_L3_EVENT"); + let ip4_event = mgr.borrow_mut().event2index("BUILDIN_IP4_EVENT"); + let ip6_event = mgr.borrow_mut().event2index("BUILDIN_IP6_EVENT"); + let l4_event = mgr.borrow_mut().event2index("BUILDIN_L4_EVENT"); + let tcp_opening_event = mgr.borrow_mut().event2index("BUILDIN_TCP_OPENING_EVENT"); + let tcp_active_event = mgr.borrow_mut().event2index("BUILDIN_TCP_ACTIVE_EVENT"); + let tcp_closed_event = mgr.borrow_mut().event2index("BUILDIN_TCP_CLOSED_EVENT"); + let udp_event = mgr.borrow_mut().event2index("BUILDIN_UDP_EVENT"); + let l7_event = mgr.borrow_mut().event2index("BUILDIN_L7_EVENT"); + let dns_event = mgr.borrow_mut().event2index("BUILDIN_DNS_EVENT"); + let http_event = mgr.borrow_mut().event2index("BUILDIN_HTTP_EVENT"); + + mgr.borrow_mut() + .register(l2_event, Box::new(plugin.clone())); + mgr.borrow_mut() + .register(l3_event, Box::new(plugin.clone())); + mgr.borrow_mut() + .register(ip4_event, Box::new(plugin.clone())); + mgr.borrow_mut() + .register(ip6_event, Box::new(plugin.clone())); + mgr.borrow_mut() + .register(l4_event, Box::new(plugin.clone())); + mgr.borrow_mut() + .register(tcp_opening_event, Box::new(plugin.clone())); + mgr.borrow_mut() + .register(tcp_active_event, Box::new(plugin.clone())); + mgr.borrow_mut() + .register(tcp_closed_event, Box::new(plugin.clone())); + mgr.borrow_mut() + .register(udp_event, Box::new(plugin.clone())); + mgr.borrow_mut() + .register(l7_event, Box::new(plugin.clone())); + mgr.borrow_mut() + .register(dns_event, Box::new(plugin.clone())); + mgr.borrow_mut() + .register(http_event, Box::new(plugin.clone())); + + // Handle packet + let mut packet = Packet::new(&bytes, bytes.len() as u32); + let result = packet.handle(thread_ctx); + match result { + Ok(_v) => { + // println!("SUCCESS: {:?}, {:?}", packet, _v); + // println!("SUCCESS: {:#?}, {:?}", packet, _v); + // dbg!(packet); + } + Err(e) => { + println!("ERROR Data: {:?}", packet); + println!("ERROR Code: {:?}", e); + // println!("ERROR Desc: {}", e); + assert_eq!(0, 1); + } + } + + // Dispatch event + mgr.borrow_mut().dispatch(&packet); + + // assert_eq!(1, 0); + } +} diff --git a/src/event/mod.rs b/src/event/mod.rs index 53f1126..7003034 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -1 +1,2 @@ pub mod event; +pub mod manager; @@ -1,4 +1,6 @@ pub mod packet; pub mod protocol; pub mod event; -pub mod session;
\ No newline at end of file +pub mod session; +pub mod plugin; +pub mod thread;
\ No newline at end of file diff --git a/src/main.rs b/src/main.rs index d4cad89..1bd173b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,22 @@ +use std::cell::RefCell; +use std::rc::Rc; +use stellar_rs::event::event::*; use stellar_rs::packet::capture::PacketCapture; use stellar_rs::packet::packet::Packet; +use stellar_rs::plugin::example::ExamplePulgin; +use stellar_rs::thread::thread::ThreadContex; -fn packet_callback(data: &[u8], len: u32) { +fn packet_callback(data: &[u8], len: u32, ctx: Rc<RefCell<ThreadContex>>) { + let event_mgr = ctx.borrow_mut().get_event_mgr(); + let session_mgr = ctx.borrow_mut().get_session_mgr(); + + let empty = Packet::new(b"", 0); let mut packet = Packet::new(data, len); - let result = packet.handle(); + let result = packet.handle(ctx); match result { - Ok(v) => { - // println!("SUCCESS: {:?}, {:?}", packet, v); - // println!("SUCCESS: {:#?}, {:?}", packet, v); + Ok(_left) => { + // println!("SUCCESS: {:?}, {:?}", packet, left); + // println!("SUCCESS: {:#?}, {:?}", packet, left); // dbg!(packet); } Err(e) => { @@ -16,10 +25,76 @@ fn packet_callback(data: &[u8], len: u32) { println!("ERROR Desc: {}", e); } } + // Hanlde Packet Event + event_mgr.borrow_mut().dispatch(&packet); + + // Hanlde Expire Event + session_mgr.borrow_mut().expire_sessions(); + event_mgr.borrow_mut().dispatch(&empty); } fn main() { + let plugin = ExamplePulgin::new( + "Example Plugin", + Rc::new(RefCell::new(String::from("Hello"))), + ); + + let thread_ctx = Rc::new(RefCell::new(ThreadContex::new())); + let mgr = thread_ctx.borrow_mut().get_event_mgr(); + + // let l2_event = mgr.borrow_mut().event2index(BUILDIN_L2_EVENT); + // mgr.borrow_mut() + // .register(l2_event, Box::new(plugin.clone())); + + // let l3_event = mgr.borrow_mut().event2index(BUILDIN_L3_EVENT); + // mgr.borrow_mut() + // .register(l3_event, Box::new(plugin.clone())); + + // let ip4_event = mgr.borrow_mut().event2index(BUILDIN_IP4_EVENT); + // mgr.borrow_mut() + // .register(ip4_event, Box::new(plugin.clone())); + + // let ip6_event = mgr.borrow_mut().event2index(BUILDIN_IP6_EVENT); + // mgr.borrow_mut() + // .register(ip6_event, Box::new(plugin.clone())); + + // let l4_event = mgr.borrow_mut().event2index(BUILDIN_L4_EVENT); + // mgr.borrow_mut() + // .register(l4_event, Box::new(plugin.clone())); + + let tcp_opening_event = mgr.borrow_mut().event2index(BUILDIN_TCP_OPENING_EVENT); + mgr.borrow_mut() + .register(tcp_opening_event, Box::new(plugin.clone())); + + let tcp_active_event = mgr.borrow_mut().event2index(BUILDIN_TCP_ACTIVE_EVENT); + mgr.borrow_mut() + .register(tcp_active_event, Box::new(plugin.clone())); + + let tcp_expire_event = mgr.borrow_mut().event2index(BUILDIN_TCP_EXPIRE_EVENT); + mgr.borrow_mut() + .register(tcp_expire_event, Box::new(plugin.clone())); + + let tcp_closed_event = mgr.borrow_mut().event2index(BUILDIN_TCP_CLOSED_EVENT); + mgr.borrow_mut() + .register(tcp_closed_event, Box::new(plugin.clone())); + + // let udp_event = mgr.borrow_mut().event2index(BUILDIN_UDP_EVENT); + // mgr.borrow_mut() + // .register(udp_event, Box::new(plugin.clone())); + + // let l7_event = mgr.borrow_mut().event2index(BUILDIN_L7_EVENT); + // mgr.borrow_mut() + // .register(l7_event, Box::new(plugin.clone())); + + // let dns_event = mgr.borrow_mut().event2index(BUILDIN_DNS_EVENT); + // mgr.borrow_mut() + // .register(dns_event, Box::new(plugin.clone())); + + // let http_event = mgr.borrow_mut().event2index(BUILDIN_HTTP_EVENT); + // mgr.borrow_mut() + // .register(http_event, Box::new(plugin.clone())); + PacketCapture::show_devices(); let mut cap = PacketCapture::new("en0"); - cap.poll_packet(packet_callback); + cap.poll_packet(packet_callback, thread_ctx); } diff --git a/src/packet/capture.rs b/src/packet/capture.rs index aa1204d..fbbf6d7 100644 --- a/src/packet/capture.rs +++ b/src/packet/capture.rs @@ -1,4 +1,7 @@ +use crate::thread::thread::ThreadContex; use pcap::Capture; +use std::cell::RefCell; +use std::rc::Rc; pub struct PacketCapture { capture: Capture<pcap::Active>, @@ -7,7 +10,7 @@ pub struct PacketCapture { impl PacketCapture { pub fn new(device: &str) -> Self { println!("Packet Capture Open Device {:?}", device); - let mut capture = Capture::from_device(device) + let capture = Capture::from_device(device) .unwrap() .immediate_mode(true) .open() @@ -16,13 +19,18 @@ impl PacketCapture { PacketCapture { capture } } - pub fn poll_packet(&mut self, callback: fn(data: &[u8], len: u32)) { + pub fn poll_packet( + &mut self, + callback: fn(data: &[u8], len: u32, ctx: Rc<RefCell<ThreadContex>>), + ctx: Rc<RefCell<ThreadContex>>, + ) { let mut packet_num = 0; while let Ok(packet) = self.capture.next_packet() { packet_num += 1; + println!("\n==================== New Packet ====================\n"); println!("Packet[{}]->header : {:?}", packet_num, packet.header); println!("Packet[{}]->data : {:?}", packet_num, packet.data); - callback(&packet.data, packet.header.len); + callback(&packet.data, packet.header.len, ctx.clone()); } } diff --git a/src/packet/error.rs b/src/packet/error.rs index 20eceec..fe0620e 100644 --- a/src/packet/error.rs +++ b/src/packet/error.rs @@ -45,4 +45,4 @@ impl core::fmt::Display for PacketError { PacketError::UnsupportAppProtocol => write!(f, "Unsupport App Protocol"), } } -}
\ No newline at end of file +} diff --git a/src/packet/packet.rs b/src/packet/packet.rs index 40403cb..8b2d460 100644 --- a/src/packet/packet.rs +++ b/src/packet/packet.rs @@ -1,3 +1,4 @@ +use crate::event::event::BuildInEvent; use crate::packet::error::PacketError; use crate::protocol::codec::Decode; use crate::protocol::dns::DNS_MESSAGE; @@ -9,6 +10,13 @@ use crate::protocol::ipv4::IPv4Header; use crate::protocol::ipv6::IPv6Header; use crate::protocol::tcp::TcpHeader; use crate::protocol::udp::UdpHeader; +use crate::session::session::packet2session; +use crate::session::session::Session; +use crate::session::session::SessionProto; +use crate::session::session::SessionState; +use crate::thread::thread::ThreadContex; +use std::cell::RefCell; +use std::rc::Rc; #[allow(non_camel_case_types)] #[derive(Clone, Debug, PartialEq)] @@ -43,11 +51,11 @@ impl Packet<'_> { } } - pub fn handle(&mut self) -> Result<(), PacketError> { + pub fn handle(&mut self, ctx: Rc<RefCell<ThreadContex>>) -> Result<(), PacketError> { if self.orig_data.len() != self.orig_len as usize { return Err(PacketError::InvalidPacketLength); } - return handle_l2(self, self.orig_data); + return handle_l2(self, self.orig_data, ctx); } pub fn get_outer_l3_layer(&self) -> Option<Encapsulation> { @@ -301,17 +309,117 @@ impl Packet<'_> { return None; } + + pub fn get_trace_id(&self) -> Option<String> { + let num = self.encapsulation.len(); + let mut trace_id = String::new(); + for i in 0..num { + match self.encapsulation[i] { + Encapsulation::L3_IP4(ref l3_header, _) => { + trace_id.push_str("IP4->IP4;"); + trace_id.push_str(&l3_header.source_address.to_string()); + trace_id.push_str("->"); + trace_id.push_str(&l3_header.dest_address.to_string()); + trace_id.push_str(";"); + } + Encapsulation::L3_IP6(ref l3_header, _) => { + trace_id.push_str("IP6->IP6;"); + trace_id.push_str(&l3_header.source_address.to_string()); + trace_id.push_str("->"); + trace_id.push_str(&l3_header.dest_address.to_string()); + trace_id.push_str(";"); + } + Encapsulation::L4_TCP(ref l4_header, _) => { + trace_id.push_str("TCP->TCP;"); + trace_id.push_str(&l4_header.source_port.to_string()); + trace_id.push_str("->"); + trace_id.push_str(&l4_header.dest_port.to_string()); + trace_id.push_str(";"); + } + Encapsulation::L4_UDP(ref l4_header, _) => { + trace_id.push_str("UDP->UDP;"); + trace_id.push_str(&l4_header.source_port.to_string()); + trace_id.push_str("->"); + trace_id.push_str(&l4_header.dest_port.to_string()); + trace_id.push_str(";"); + } + _ => continue, + } + } + + Some(trace_id) + } + + pub fn get_reversed_trace_id(&self) -> Option<String> { + let num = self.encapsulation.len(); + let mut trace_id = String::new(); + for i in 0..num { + match self.encapsulation[i] { + Encapsulation::L3_IP4(ref l3_header, _) => { + trace_id.push_str("IP4->IP4;"); + trace_id.push_str(&l3_header.dest_address.to_string()); + trace_id.push_str("->"); + trace_id.push_str(&l3_header.source_address.to_string()); + trace_id.push_str(";"); + } + Encapsulation::L3_IP6(ref l3_header, _) => { + trace_id.push_str("IP6->IP6;"); + trace_id.push_str(&l3_header.dest_address.to_string()); + trace_id.push_str("->"); + trace_id.push_str(&l3_header.source_address.to_string()); + trace_id.push_str(";"); + } + Encapsulation::L4_TCP(ref l4_header, _) => { + trace_id.push_str("TCP->TCP;"); + trace_id.push_str(&l4_header.dest_port.to_string()); + trace_id.push_str("->"); + trace_id.push_str(&l4_header.source_port.to_string()); + trace_id.push_str(";"); + } + Encapsulation::L4_UDP(ref l4_header, _) => { + trace_id.push_str("UDP->UDP;"); + trace_id.push_str(&l4_header.dest_port.to_string()); + trace_id.push_str("->"); + trace_id.push_str(&l4_header.source_port.to_string()); + trace_id.push_str(";"); + } + _ => continue, + } + } + + Some(trace_id) + } } -fn handle_l2<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketError> { +pub fn trace_id_reversed(trace_id: &String) -> String { + let mut reversed_trace_id = String::new(); + let mut trace_id_vec: Vec<&str> = trace_id.split(";").collect(); + trace_id_vec.pop(); + for item in trace_id_vec.iter() { + let mut item_vec: Vec<&str> = item.split("->").collect(); + item_vec.reverse(); + reversed_trace_id.push_str(&item_vec.join("->")); + reversed_trace_id.push_str(";"); + } + reversed_trace_id +} + +fn handle_l2<'a>( + packet: &mut Packet<'a>, + input: &'a [u8], + ctx: Rc<RefCell<ThreadContex>>, +) -> Result<(), PacketError> { + let event_mgr = ctx.borrow().get_event_mgr(); let result = EthernetFrame::decode(input); if let Ok((payload, header)) = result { dbg!(&header); packet .encapsulation .push(Encapsulation::L2_ETH(header, payload)); - return handle_l3(packet, payload, header.ether_type); + BuildInEvent::trigger_l2_event(event_mgr, None); + return handle_l3(packet, payload, header.ether_type, ctx); } else { + BuildInEvent::trigger_l2_event(event_mgr, None); return Err(PacketError::IncompleteEthernetFrame); } } @@ -320,7 +428,9 @@ fn handle_l3<'a>( packet: &mut Packet<'a>, input: &'a [u8], next_proto: EtherType, + ctx: Rc<RefCell<ThreadContex>>, ) -> Result<(), PacketError> { + let event_mgr = ctx.borrow().get_event_mgr(); match next_proto { EtherType::IPv4 => { let result = IPv4Header::decode(input); @@ -337,8 +447,11 @@ fn handle_l3<'a>( // TODO IPv4 Fragment - return handle_l4(packet, payload, header.protocol); + BuildInEvent::trigger_l3_event(event_mgr.clone(), None); + BuildInEvent::trigger_ip4_event(event_mgr, None); + return handle_l4(packet, payload, header.protocol, ctx); } else { + BuildInEvent::trigger_l3_event(event_mgr, None); return Err(PacketError::IncompleteIpv4Header); } } @@ -357,12 +470,16 @@ fn handle_l3<'a>( // TODO IPv6 Fragment - return handle_l4(packet, payload, header.next_header); + BuildInEvent::trigger_l3_event(event_mgr.clone(), None); + BuildInEvent::trigger_ip6_event(event_mgr, None); + return handle_l4(packet, payload, header.next_header, ctx); } else { + BuildInEvent::trigger_l3_event(event_mgr, None); return Err(PacketError::IncompleteIpv6Header); } } _e => { + BuildInEvent::trigger_l3_event(event_mgr, None); return Err(PacketError::UnsupportEthernetType); } } @@ -372,7 +489,10 @@ fn handle_l4<'a>( packet: &mut Packet<'a>, input: &'a [u8], next_proto: IPProtocol, + ctx: Rc<RefCell<ThreadContex>>, ) -> Result<(), PacketError> { + let event_mgr = ctx.borrow().get_event_mgr(); + let session_mgr = ctx.borrow().get_session_mgr(); match next_proto { IPProtocol::UDP => { let result = UdpHeader::decode(input); @@ -381,8 +501,24 @@ fn handle_l4<'a>( packet .encapsulation .push(Encapsulation::L4_UDP(header, payload)); - return handle_l7(packet, payload); + + let session = packet2session(&packet, session_mgr); + BuildInEvent::trigger_l4_event(event_mgr.clone(), Some(session.clone())); + session.borrow_mut().set_session_proto(SessionProto::UDP); + match session.borrow().get_session_state() { + SessionState::New => { + BuildInEvent::trigger_udp_opening_event(event_mgr, Some(session.clone())); + } + SessionState::Active => { + BuildInEvent::trigger_udp_active_event(event_mgr, Some(session.clone())); + } + SessionState::Inactive | SessionState::Expired => { + BuildInEvent::trigger_udp_expire_event(event_mgr, Some(session.clone())); + } + } + return handle_l7(packet, payload, session, ctx); } else { + BuildInEvent::trigger_l4_event(event_mgr, None); return Err(PacketError::IncompleteUdpHeader); } } @@ -390,22 +526,57 @@ fn handle_l4<'a>( let result = TcpHeader::decode(input); if let Ok((payload, header)) = result { dbg!(&header); + + let tcp_need_closed = header.flag_rst || header.flag_fin; packet .encapsulation .push(Encapsulation::L4_TCP(header, payload)); + // TODO TCP Reassembly - return handle_l7(packet, payload); + + let session = packet2session(&packet, session_mgr); + BuildInEvent::trigger_l4_event(event_mgr.clone(), Some(session.clone())); + if tcp_need_closed { + session + .borrow_mut() + .set_session_state(SessionState::Inactive); + } + session.borrow_mut().set_session_proto(SessionProto::TCP); + match session.borrow().get_session_state() { + SessionState::New => { + BuildInEvent::trigger_tcp_opening_event(event_mgr, Some(session.clone())); + } + SessionState::Active => { + BuildInEvent::trigger_tcp_active_event(event_mgr, Some(session.clone())); + } + SessionState::Expired => { + BuildInEvent::trigger_tcp_expire_event(event_mgr, Some(session.clone())); + } + SessionState::Inactive => { + BuildInEvent::trigger_tcp_closed_event(event_mgr, Some(session.clone())); + } + } + + return handle_l7(packet, payload, session, ctx); } else { + BuildInEvent::trigger_l4_event(event_mgr, None); return Err(PacketError::IncompleteTcpHeader); } } _e => { + BuildInEvent::trigger_l4_event(event_mgr, None); return Err(PacketError::UnsupportIPProtocol); } } } -fn handle_l7<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketError> { +fn handle_l7<'a>( + packet: &mut Packet<'a>, + input: &'a [u8], + session: Rc<RefCell<Session>>, + ctx: Rc<RefCell<ThreadContex>>, +) -> Result<(), PacketError> { + let event_mgr = ctx.borrow().get_event_mgr(); let next_proto = l7_identify(packet); match next_proto { L7Protocol::DNS => { @@ -415,8 +586,11 @@ fn handle_l7<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketE packet .encapsulation .push(Encapsulation::L7_DNS(header, payload)); + BuildInEvent::trigger_l7_event(event_mgr.clone(), Some(session.clone())); + BuildInEvent::trigger_dns_event(event_mgr, Some(session)); return Ok(()); } else { + BuildInEvent::trigger_l7_event(event_mgr, Some(session)); return Err(PacketError::IncompleteAppHeader); } } @@ -427,12 +601,16 @@ fn handle_l7<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketE packet .encapsulation .push(Encapsulation::L7_HTTP(header, payload)); + BuildInEvent::trigger_l7_event(event_mgr.clone(), Some(session.clone())); + BuildInEvent::trigger_http_event(event_mgr, Some(session)); return Ok(()); } else { + BuildInEvent::trigger_l7_event(event_mgr, Some(session)); return Err(PacketError::IncompleteAppHeader); } } L7Protocol::Unsupported => { + BuildInEvent::trigger_l7_event(event_mgr, Some(session)); return Err(PacketError::UnsupportAppProtocol); } } @@ -470,13 +648,17 @@ fn l7_identify(packet: &Packet) -> L7Protocol { mod tests { use super::Encapsulation; use super::Packet; + use crate::packet::packet::trace_id_reversed; use crate::protocol::ip::IPProtocol; use crate::protocol::ipv4::IPv4Header; use crate::protocol::ipv6::IPv6Header; use crate::protocol::tcp::TcpHeader; use crate::protocol::udp::UdpHeader; + use crate::thread::thread::ThreadContex; + use std::cell::RefCell; use std::net::Ipv4Addr; use std::net::Ipv6Addr; + use std::rc::Rc; #[test] fn test_packet_handle1() { @@ -574,8 +756,9 @@ mod tests { 0x00, 0x01, 0x00, 0x01, /* DNS END */ ]; + let thread_ctx = Rc::new(RefCell::new(ThreadContex::new())); let mut packet = Packet::new(&bytes, bytes.len() as u32); - let result = packet.handle(); + let result = packet.handle(thread_ctx); match result { Ok(v) => { @@ -715,8 +898,9 @@ mod tests { 0x2a, 0x0d, 0x0a, 0x0d, 0x0a, /* HTTP END */ ]; + let thread_ctx = Rc::new(RefCell::new(ThreadContex::new())); let mut packet = Packet::new(&bytes, bytes.len() as u32); - let result = packet.handle(); + let result = packet.handle(thread_ctx); match result { Ok(v) => { @@ -853,5 +1037,9 @@ mod tests { packet.get_inner_l4_layer(), Some(Encapsulation::L4_UDP(udp_hdr, b"4")) ); + + assert_eq!(packet.get_trace_id(), Some("IP4->IP4;192.168.0.101->121.14.154.93;TCP->TCP;50081->443;IP6->IP6;2409:8034:4025::50:a31->2409:8034:4040:5301::204;UDP->UDP;9993->9994;".to_string())); + assert_eq!(packet.get_reversed_trace_id() ,Some("IP4->IP4;121.14.154.93->192.168.0.101;TCP->TCP;443->50081;IP6->IP6;2409:8034:4040:5301::204->2409:8034:4025::50:a31;UDP->UDP;9994->9993;".to_string())); + assert_eq!(trace_id_reversed(&packet.get_trace_id().unwrap()), "IP4->IP4;121.14.154.93->192.168.0.101;TCP->TCP;443->50081;IP6->IP6;2409:8034:4040:5301::204->2409:8034:4025::50:a31;UDP->UDP;9994->9993;".to_string()); } } diff --git a/src/plugin/example.rs b/src/plugin/example.rs new file mode 100644 index 0000000..68fd542 --- /dev/null +++ b/src/plugin/example.rs @@ -0,0 +1,50 @@ +use crate::event::manager::EventHandle; +use crate::packet::packet::Packet; +use crate::session::session::Session; +use std::cell::RefCell; +use std::rc::Rc; + +#[derive(Debug, Clone)] +pub struct ExamplePulgin { + plugin_name: &'static str, + plugin_ctx: Rc<RefCell<String>>, + called_times: u64, +} + +impl ExamplePulgin { + pub fn new(plugin_name: &'static str, plugin_ctx: Rc<RefCell<String>>) -> ExamplePulgin { + ExamplePulgin { + plugin_name, + plugin_ctx, + called_times: 0, + } + } +} + +impl EventHandle for ExamplePulgin { + fn handle(&mut self, index: usize, packet: &Packet, session: Option<Rc<RefCell<Session>>>) { + self.called_times += 1; + self.plugin_ctx.borrow_mut().clear(); + self.plugin_ctx.borrow_mut().push_str("1"); + + println!("{} handle event : {:?}", self.plugin_name, index); + println!( + "{} handle times : {:?}", + self.plugin_name, self.called_times + ); + println!( + "{} handle tuple : {:?}", + self.plugin_name, + packet.get_outer_tuple() + ); + + if session.is_none() { + return; + } + let session = session.unwrap(); + println!("{} handle session : {:?}", self.plugin_name, session); + session + .borrow_mut() + .set_session_exdata("example pulgin".to_string(), "success".to_string()); + } +} diff --git a/src/plugin/mod.rs b/src/plugin/mod.rs new file mode 100644 index 0000000..10f592e --- /dev/null +++ b/src/plugin/mod.rs @@ -0,0 +1 @@ +pub mod example;
\ No newline at end of file diff --git a/src/session/manager.rs b/src/session/manager.rs index d02a5db..2a63bbf 100644 --- a/src/session/manager.rs +++ b/src/session/manager.rs @@ -1,7 +1,11 @@ +use crate::packet::packet::trace_id_reversed; use crate::session::session::Session; +use crate::session::session::SessionProto; use crate::session::session::SessionState; use chrono::Utc; +use std::cell::RefCell; use std::collections::HashMap; +use std::rc::Rc; /****************************************************************************** * Struct @@ -9,7 +13,7 @@ use std::collections::HashMap; #[derive(Debug)] pub struct SessionManager { - sessions: HashMap<String, Session>, + sessions: HashMap<String, Rc<RefCell<Session>>>, } /****************************************************************************** @@ -23,37 +27,59 @@ impl SessionManager { } } - pub fn get_session(&self, session_id: &str) -> Option<&Session> { - self.sessions.get(session_id) - } + pub fn get_session(&self, session_id: &String) -> Option<Rc<RefCell<Session>>> { + let result = self.sessions.get(session_id).map(|session| session.clone()); + if result.is_some() { + return result; + } - pub fn get_session_mut(&mut self, session_id: &str) -> Option<&mut Session> { - self.sessions.get_mut(session_id) + let reversed_id = trace_id_reversed(session_id); + self.sessions + .get(reversed_id.as_str()) + .map(|session| session.clone()) } - pub fn insert_session(&mut self, session_id: String, session: Session) { + pub fn insert_session(&mut self, session_id: String, session: Rc<RefCell<Session>>) { self.sessions.insert(session_id, session); } - pub fn remove_session(&mut self, session_id: &str) -> Option<Session> { - self.sessions.remove(session_id) + pub fn remove_session(&mut self, session_id: &str) -> Option<Rc<RefCell<Session>>> { + let result = self.sessions.remove(session_id); + if result.is_some() { + return result; + } + + let reversed_id = trace_id_reversed(&session_id.to_string()); + self.sessions.remove(reversed_id.as_str()) } pub fn expire_sessions(&mut self) { let now = Utc::now().timestamp(); let mut expired_sessions = Vec::with_capacity(1024); for (session_id, session) in &self.sessions { - if session.get_session_expire_ts() < now { + if session.borrow().get_session_expire_ts() < now { expired_sessions.push(session_id.clone()); } } for session_id in expired_sessions { let option = self.remove_session(&session_id); - if let Some(mut session) = option { + if let Some(session) = option { print!("Session expired: {}", session_id); - session.set_session_state(SessionState::Expired); - session.set_session_end_ts(Utc::now().timestamp()); - // TODO trigger session expire event or call session's on_expire() method + session + .borrow_mut() + .set_session_state(SessionState::Expired); + session + .borrow_mut() + .set_session_end_ts(Utc::now().timestamp()); + + match session.borrow().get_session_proto() { + SessionProto::TCP => { + //BuildInEvent::trigger_tcp_expire_event(event_mgr, Some(session)); + } + SessionProto::UDP => { + //BuildInEvent::trigger_udp_expire_event(event_mgr, Some(session)); + } + } } } } @@ -67,59 +93,41 @@ impl SessionManager { mod tests { use super::SessionManager; use crate::session::session::Session; - use crate::session::session::SessionState; - use crate::session::tuple::FiveTuple; - use chrono::Utc; - use std::net::Ipv4Addr; + use std::cell::RefCell; + use std::rc::Rc; + // use std::{thread, time}; #[test] fn test_session_manager() { + let session_id = "IP4->IP4;192.168.0.1->192.168.0.2;UDP->UDP;2345->80;".to_string(); + let reversed_id = "IP4->IP4;192.168.0.2->192.168.0.1;UDP->UDP;80->2345;".to_string(); + let session = Rc::new(RefCell::new(Session::new(session_id.clone()))); + // Create Session Manager let mut session_mgr = SessionManager::new(1024); - let five_tuple_4 = FiveTuple::from_v4( - Ipv4Addr::new(192, 168, 0, 1), - Ipv4Addr::new(192, 168, 0, 2), - 2345, - 80, - 1, - ); - let session_id = five_tuple_4.to_string(); - // Search Session - let option = session_mgr.get_session_mut(session_id.as_str()); - assert_eq!(option.is_none(), true); + assert_eq!(session_mgr.get_session(&session_id).is_none(), true); // Insert Session - let mut session = Session::new(five_tuple_4); - session.inc_session_c2s_metrics(0, 2, 0, 120); - session.set_session_exdata("Hello".to_string(), "Word".to_string()); session_mgr.insert_session(session_id.clone(), session); // Update Session - let option = session_mgr.get_session_mut(session_id.as_str()); - assert_eq!(option.is_some(), true); - if let Some(session) = option { - session.set_session_exdata("Hello".to_string(), "Word!!!".to_string()); - session.set_session_last_seen_ts(Utc::now().timestamp()); - session.set_session_state(SessionState::Active); - } + assert_eq!(session_mgr.get_session(&session_id).is_some(), true); + assert_eq!(session_mgr.get_session(&reversed_id).is_some(), true); // Expire Session // thread::sleep(time::Duration::from_secs(61)); // session_mgr.expire_sessions(); // Delete session - let option = session_mgr.remove_session(session_id.as_str()); - assert_eq!(option.is_some(), true); + assert_eq!(session_mgr.remove_session(&reversed_id).is_some(), true); // Research Session - let option = session_mgr.get_session_mut(session_id.as_str()); - assert_eq!(option.is_none(), true); + assert_eq!(session_mgr.get_session(&session_id).is_none(), true); + assert_eq!(session_mgr.get_session(&reversed_id).is_none(), true); dbg!(session_mgr); - - // assert_eq!(1, 0); } } diff --git a/src/session/session.rs b/src/session/session.rs index 36d899c..41bd54a 100644 --- a/src/session/session.rs +++ b/src/session/session.rs @@ -1,11 +1,16 @@ -use crate::session::tuple::FiveTuple; +use crate::packet::packet::Packet; +use crate::session::manager::SessionManager; use chrono::Utc; +use std::cell::RefCell; use std::collections::HashMap; +use std::rc::Rc; /****************************************************************************** * Struct ******************************************************************************/ +const MAX_SESSION_EXPIRE_TIME: i64 = 60; + #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum SessionDirection { C2S, @@ -13,6 +18,12 @@ pub enum SessionDirection { } #[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum SessionProto { + TCP, + UDP, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum SessionState { New, Active, @@ -22,17 +33,10 @@ pub enum SessionState { #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub struct SessionMetrics { - pub c2s_pkts_sent: u64, - pub c2s_pkts_recv: u64, - - pub c2s_bytes_sent: u64, - pub c2s_bytes_recv: u64, - - pub s2c_pkts_sent: u64, - pub s2c_pkts_recv: u64, - - pub s2c_bytes_sent: u64, - pub s2c_bytes_recv: u64, + pub send_pkts: u64, + pub send_bytes: u64, + pub recv_pkts: u64, + pub recv_bytes: u64, } #[derive(Clone, Copy, Debug, PartialEq, Eq)] @@ -40,7 +44,7 @@ struct SessionTimeStamp { ts_start: i64, ts_end: i64, ts_expires: i64, - ts_last_seen: i64, + ts_last: i64, } /****************************************************************************** @@ -52,9 +56,10 @@ struct SessionTimeStamp { #[derive(Clone, Debug, PartialEq, Eq)] pub struct Session { session_id: String, + session_proto: SessionProto, session_state: SessionState, - session_metrics: SessionMetrics, - session_five_tuple: FiveTuple, + session_metrics_c2s: SessionMetrics, + session_metrics_s2c: SessionMetrics, session_timestamp: SessionTimeStamp, session_exdata: HashMap<String, String>, session_current_dir: SessionDirection, @@ -65,33 +70,43 @@ pub struct Session { ******************************************************************************/ impl Session { - pub fn new(tuple: FiveTuple) -> Session { + pub fn new(session_id: String) -> Session { let timestamp = Utc::now().timestamp(); Session { - session_id: tuple.to_string(), + session_id, + session_proto: SessionProto::TCP, session_state: SessionState::New, - session_metrics: SessionMetrics { - c2s_pkts_sent: 0, - c2s_pkts_recv: 0, - c2s_bytes_sent: 0, - c2s_bytes_recv: 0, - s2c_pkts_sent: 0, - s2c_pkts_recv: 0, - s2c_bytes_sent: 0, - s2c_bytes_recv: 0, + session_metrics_c2s: SessionMetrics { + send_pkts: 0, + send_bytes: 0, + recv_pkts: 0, + recv_bytes: 0, + }, + session_metrics_s2c: SessionMetrics { + send_pkts: 0, + send_bytes: 0, + recv_pkts: 0, + recv_bytes: 0, }, - session_five_tuple: tuple, session_timestamp: SessionTimeStamp { ts_start: timestamp, ts_end: 0, - ts_expires: timestamp + 60, - ts_last_seen: timestamp, + ts_expires: timestamp + MAX_SESSION_EXPIRE_TIME, + ts_last: timestamp, }, session_exdata: HashMap::new(), session_current_dir: SessionDirection::C2S, } } + pub fn set_session_proto(&mut self, proto: SessionProto) { + self.session_proto = proto; + } + + pub fn get_session_proto(&self) -> SessionProto { + self.session_proto + } + pub fn get_session_id(&self) -> String { self.session_id.clone() } @@ -106,56 +121,56 @@ impl Session { pub fn inc_session_c2s_metrics( &mut self, - pkts_sent: u64, - pkts_recv: u64, - bytes_sent: u64, - bytes_recv: u64, + send_pkts: u64, + send_bytes: u64, + recv_pkts: u64, + recv_bytes: u64, ) { - self.session_metrics.c2s_pkts_sent += pkts_sent; - self.session_metrics.c2s_pkts_recv += pkts_recv; - self.session_metrics.c2s_bytes_sent += bytes_sent; - self.session_metrics.c2s_bytes_recv += bytes_recv; + self.session_metrics_c2s.send_pkts += send_pkts; + self.session_metrics_c2s.send_bytes += send_bytes; + self.session_metrics_c2s.recv_pkts += recv_pkts; + self.session_metrics_c2s.recv_bytes += recv_bytes; } pub fn inc_session_s2c_metrics( &mut self, - pkts_sent: u64, - pkts_recv: u64, - bytes_sent: u64, - bytes_recv: u64, + send_pkts: u64, + send_bytes: u64, + recv_pkts: u64, + recv_bytes: u64, ) { - self.session_metrics.s2c_pkts_sent += pkts_sent; - self.session_metrics.s2c_pkts_recv += pkts_recv; - self.session_metrics.s2c_bytes_sent += bytes_sent; - self.session_metrics.s2c_bytes_recv += bytes_recv; + self.session_metrics_s2c.send_pkts += send_pkts; + self.session_metrics_s2c.send_bytes += send_bytes; + self.session_metrics_s2c.recv_pkts += recv_pkts; + self.session_metrics_s2c.recv_bytes += recv_bytes; } - pub fn get_session_metrics(&self) -> SessionMetrics { - self.session_metrics + pub fn get_session_c2s_metrics(&self) -> SessionMetrics { + self.session_metrics_c2s } - pub fn get_session_five_tuple(&self) -> FiveTuple { - self.session_five_tuple + pub fn get_session_s2c_metrics(&self) -> SessionMetrics { + self.session_metrics_s2c } pub fn get_session_start_ts(&self) -> i64 { self.session_timestamp.ts_start } - pub fn set_session_expire_ts(&mut self, ts: i64) { - self.session_timestamp.ts_expires = ts; + pub fn update_session_expire_ts(&mut self) { + self.session_timestamp.ts_expires = Utc::now().timestamp() + MAX_SESSION_EXPIRE_TIME; } pub fn get_session_expire_ts(&self) -> i64 { self.session_timestamp.ts_expires } - pub fn set_session_last_seen_ts(&mut self, ts: i64) { - self.session_timestamp.ts_last_seen = ts; + pub fn update_session_last_ts(&mut self) { + self.session_timestamp.ts_last = Utc::now().timestamp(); } - pub fn get_session_last_seen_ts(&self) -> i64 { - self.session_timestamp.ts_last_seen + pub fn get_session_last_ts(&self) -> i64 { + self.session_timestamp.ts_last } pub fn set_session_end_ts(&mut self, ts: i64) { @@ -174,7 +189,7 @@ impl Session { self.session_exdata.get(&key) } - pub fn set_session_current_dir(&mut self, dir: SessionDirection) { + pub fn update_session_current_dir(&mut self, dir: SessionDirection) { self.session_current_dir = dir; } @@ -184,44 +199,83 @@ impl Session { } /****************************************************************************** + * API Packet -> Session + ******************************************************************************/ + +pub fn packet2session( + packet: &Packet, + session_mgr: Rc<RefCell<SessionManager>>, +) -> Rc<RefCell<Session>> { + let rc_session; + let packet_len = packet.orig_len as u64; + + let session_id = packet.get_trace_id().unwrap(); + let option = session_mgr.borrow().get_session(&session_id); + if option.is_none() { + let mut session = Session::new(session_id.clone()); + session.inc_session_c2s_metrics(0, 0, 1, packet_len); + session.update_session_current_dir(SessionDirection::C2S); + rc_session = Rc::new(RefCell::new(session)); + session_mgr + .borrow_mut() + .insert_session(session_id, rc_session.clone()); + } else { + rc_session = option.unwrap(); + rc_session + .borrow_mut() + .set_session_state(SessionState::Active); + + if rc_session.borrow().get_session_id() == session_id { + rc_session + .borrow_mut() + .inc_session_c2s_metrics(0, 0, 1, packet_len); + rc_session + .borrow_mut() + .update_session_current_dir(SessionDirection::C2S); + } else { + rc_session + .borrow_mut() + .inc_session_s2c_metrics(0, 0, 1, packet_len); + rc_session + .borrow_mut() + .update_session_current_dir(SessionDirection::S2C); + } + rc_session.borrow_mut().update_session_last_ts(); + rc_session.borrow_mut().update_session_expire_ts(); + } + + rc_session +} + +/****************************************************************************** * TEST ******************************************************************************/ #[cfg(test)] mod tests { use super::Session; - use crate::session::tuple::FiveTuple; - use std::net::Ipv4Addr; #[test] fn test_session() { - let five_tuple_4 = FiveTuple::from_v4( - Ipv4Addr::new(192, 168, 0, 1), - Ipv4Addr::new(192, 168, 0, 2), - 2345, - 80, - 1, - ); - - let mut session = Session::new(five_tuple_4); - + let mut session = Session::new("192.168.0.1:2345-192.168.0.2:80-1".to_string()); assert_eq!( session.get_session_id(), - "4:192.168.0.1:2345-192.168.0.2:80:1" + "192.168.0.1:2345-192.168.0.2:80-1" ); + assert_eq!(session.get_session_state(), super::SessionState::New); - assert_eq!(session.get_session_metrics().c2s_pkts_sent, 0); - assert_eq!(session.get_session_metrics().c2s_pkts_recv, 0); - assert_eq!(session.get_session_metrics().c2s_bytes_sent, 0); - assert_eq!(session.get_session_metrics().c2s_bytes_recv, 0); - assert_eq!(session.get_session_metrics().s2c_pkts_sent, 0); - assert_eq!(session.get_session_metrics().s2c_pkts_recv, 0); - assert_eq!(session.get_session_metrics().s2c_bytes_sent, 0); - assert_eq!(session.get_session_metrics().s2c_bytes_recv, 0); - assert_eq!(session.get_session_five_tuple(), five_tuple_4); + assert_eq!(session.get_session_c2s_metrics().recv_bytes, 0); + assert_eq!(session.get_session_c2s_metrics().recv_bytes, 0); + assert_eq!(session.get_session_c2s_metrics().send_pkts, 0); + assert_eq!(session.get_session_c2s_metrics().send_bytes, 0); + + assert_eq!(session.get_session_s2c_metrics().recv_bytes, 0); + assert_eq!(session.get_session_s2c_metrics().recv_bytes, 0); + assert_eq!(session.get_session_s2c_metrics().send_pkts, 0); + assert_eq!(session.get_session_s2c_metrics().send_pkts, 0); assert_eq!( session.get_session_start_ts(), - session.get_session_last_seen_ts() + session.get_session_last_ts() ); assert_eq!( session.get_session_start_ts(), diff --git a/src/session/tuple.rs b/src/session/tuple.rs index 08a4fe1..9a4dacf 100644 --- a/src/session/tuple.rs +++ b/src/session/tuple.rs @@ -1,30 +1,12 @@ -use std::net::Ipv4Addr; -use std::net::Ipv6Addr; - /****************************************************************************** * Struct ******************************************************************************/ -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -enum Address { - V4(Ipv4Addr), - V6(Ipv6Addr), -} - -impl ToString for Address { - fn to_string(&self) -> String { - match self { - Address::V4(ip) => ip.to_string(), - Address::V6(ip) => ip.to_string(), - } - } -} - -#[derive(Clone, Copy, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct FiveTuple { - src_ip: Address, - dst_ip: Address, + src_ip: String, src_port: u16, + dst_ip: String, dst_port: u16, protocol: u8, } @@ -34,44 +16,24 @@ pub struct FiveTuple { ******************************************************************************/ impl FiveTuple { - pub fn from_v4( - src_ip: Ipv4Addr, - dst_ip: Ipv4Addr, - src_port: u16, - dst_port: u16, - protocol: u8, - ) -> FiveTuple { - FiveTuple { - src_ip: Address::V4(src_ip), - dst_ip: Address::V4(dst_ip), - src_port: src_port, - dst_port: dst_port, - protocol: protocol, - } - } - - pub fn from_v6( - src_ip: Ipv6Addr, - dst_ip: Ipv6Addr, + pub fn new( + src_ip: String, src_port: u16, + dst_ip: String, dst_port: u16, protocol: u8, ) -> FiveTuple { FiveTuple { - src_ip: Address::V6(src_ip), - dst_ip: Address::V6(dst_ip), - src_port: src_port, - dst_port: dst_port, - protocol: protocol, + src_ip, + src_port, + dst_ip, + dst_port, + protocol, } } pub fn to_string(&self) -> String { let mut session_id = String::new(); - match self.src_ip { - Address::V4(_) => session_id.push_str("4:"), - Address::V6(_) => session_id.push_str("6:"), - } session_id.push_str(&self.src_ip.to_string()); session_id.push_str(":"); session_id.push_str(&self.src_port.to_string()); @@ -79,7 +41,7 @@ impl FiveTuple { session_id.push_str(&self.dst_ip.to_string()); session_id.push_str(":"); session_id.push_str(&self.dst_port.to_string()); - session_id.push_str(":"); + session_id.push_str("-"); session_id.push_str(&self.protocol.to_string()); session_id } @@ -92,32 +54,30 @@ impl FiveTuple { #[cfg(test)] mod tests { use super::FiveTuple; - use std::net::Ipv4Addr; - use std::net::Ipv6Addr; #[test] fn test_five_tuple() { - let five_tuple_4 = FiveTuple::from_v4( - Ipv4Addr::new(192, 168, 0, 1), - Ipv4Addr::new(192, 168, 0, 2), + let five_tuple_4 = FiveTuple::new( + "192.168.0.1".to_string(), 2345, + "192.168.0.2".to_string(), 80, 1, ); let session_id_4 = five_tuple_4.to_string(); - assert_eq!(session_id_4, "4:192.168.0.1:2345-192.168.0.2:80:1"); + assert_eq!(session_id_4, "192.168.0.1:2345-192.168.0.2:80-1"); - let five_tuple_6 = FiveTuple::from_v6( - Ipv6Addr::new(0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88), - Ipv6Addr::new(0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff), + let five_tuple_6 = FiveTuple::new( + "11:22:33:44:55:66:77:88".to_string(), 2345, + "88:99:aa:bb:cc:dd:ee:ff".to_string(), 80, 1, ); let session_id_6 = five_tuple_6.to_string(); assert_eq!( session_id_6, - "6:11:22:33:44:55:66:77:88:2345-88:99:aa:bb:cc:dd:ee:ff:80:1" + "11:22:33:44:55:66:77:88:2345-88:99:aa:bb:cc:dd:ee:ff:80-1" ); } } diff --git a/src/thread/mod.rs b/src/thread/mod.rs new file mode 100644 index 0000000..0910418 --- /dev/null +++ b/src/thread/mod.rs @@ -0,0 +1 @@ +pub mod thread;
\ No newline at end of file diff --git a/src/thread/thread.rs b/src/thread/thread.rs new file mode 100644 index 0000000..d13c271 --- /dev/null +++ b/src/thread/thread.rs @@ -0,0 +1,26 @@ +use crate::event::manager::EventManager; +use crate::session::manager::SessionManager; +use std::cell::RefCell; +use std::rc::Rc; + +pub struct ThreadContex { + event_mgr: Rc<RefCell<EventManager>>, + session_mgr: Rc<RefCell<SessionManager>>, +} + +impl ThreadContex { + pub fn new() -> Self { + ThreadContex { + event_mgr: Rc::new(RefCell::new(EventManager::new())), + session_mgr: Rc::new(RefCell::new(SessionManager::new(4096))), + } + } + + pub fn get_event_mgr(&self) -> Rc<RefCell<EventManager>> { + self.event_mgr.clone() + } + + pub fn get_session_mgr(&self) -> Rc<RefCell<SessionManager>> { + self.session_mgr.clone() + } +} |
