diff options
| author | luwenpeng <[email protected]> | 2023-09-26 11:42:51 +0800 |
|---|---|---|
| committer | luwenpeng <[email protected]> | 2023-09-26 20:00:48 +0800 |
| commit | 521fbe5464652d509e3290fd336c87ba28fa24c0 (patch) | |
| tree | d7c0a0a7107852cff9947cb0c453e9f0310e91b5 | |
| parent | 3e2300a6abc592862397f66d66c8a2c811fc2ae4 (diff) | |
[refactor] Remove <Rc> from thread context; Rename event name; Add packet metrics
| -rw-r--r-- | src/event/event.rs | 52 | ||||
| -rw-r--r-- | src/event/manager.rs | 2 | ||||
| -rw-r--r-- | src/main.rs | 78 | ||||
| -rw-r--r-- | src/packet/capture.rs | 20 | ||||
| -rw-r--r-- | src/packet/error.rs | 83 | ||||
| -rw-r--r-- | src/packet/mod.rs | 2 | ||||
| -rw-r--r-- | src/packet/packet.rs | 90 | ||||
| -rw-r--r-- | src/packet/status.rs | 74 | ||||
| -rw-r--r-- | src/plugin/example.rs | 34 | ||||
| -rw-r--r-- | src/protocol/dns.rs | 14 | ||||
| -rw-r--r-- | src/protocol/ethernet.rs | 6 | ||||
| -rw-r--r-- | src/protocol/gtpv1.rs | 5 | ||||
| -rw-r--r-- | src/protocol/icmp.rs | 9 | ||||
| -rw-r--r-- | src/protocol/ip.rs | 6 | ||||
| -rw-r--r-- | src/protocol/ipv6.rs | 5 | ||||
| -rw-r--r-- | src/protocol/l2tp.rs | 9 | ||||
| -rw-r--r-- | src/protocol/ppp.rs | 4 | ||||
| -rw-r--r-- | src/protocol/pppoe.rs | 15 | ||||
| -rw-r--r-- | src/protocol/pptp.rs | 9 | ||||
| -rw-r--r-- | src/thread/thread.rs | 24 |
20 files changed, 267 insertions, 274 deletions
diff --git a/src/event/event.rs b/src/event/event.rs index deb8b6f..830bdda 100644 --- a/src/event/event.rs +++ b/src/event/event.rs @@ -4,23 +4,23 @@ pub enum Event { L3Event, L4EVENT, - Ipv4Event, - Ipv6Event, + IPv4Event, + IPv6Event, - TcpEvent, - UdpEvent, + TCPEvent, + UDPEvent, - TcpOpeningEvent, - TcpActiveEvent, - TcpExpireEvent, - TcpClosedEvent, + TCPOpeningEvent, + TCPActiveEvent, + TCPExpireEvent, + TCPClosedEvent, - UdpOpeningEvent, - UdpActiveEvent, - UdpExpireEvent, + UDPOpeningEvent, + UDPActiveEvent, + UDPExpireEvent, - HttpRequestEvent, - HttpResponseEvent, + HTTPRequestEvent, + HTTPResponseEvent, } impl core::fmt::Display for Event { @@ -30,23 +30,23 @@ impl core::fmt::Display for Event { Event::L3Event => write!(f, "L3Event"), Event::L4EVENT => write!(f, "L4EVENT"), - Event::Ipv4Event => write!(f, "Ipv4Event"), - Event::Ipv6Event => write!(f, "Ipv6Event"), + Event::IPv4Event => write!(f, "IPv4Event"), + Event::IPv6Event => write!(f, "IPv6Event"), - Event::TcpEvent => write!(f, "TcpEvent"), - Event::UdpEvent => write!(f, "UdpEvent"), + Event::TCPEvent => write!(f, "TCPEvent"), + Event::UDPEvent => write!(f, "UDPEvent"), - Event::TcpOpeningEvent => write!(f, "TcpOpeningEvent"), - Event::TcpActiveEvent => write!(f, "TcpActiveEvent"), - Event::TcpExpireEvent => write!(f, "TcpExpireEvent"), - Event::TcpClosedEvent => write!(f, "TcpClosedEvent"), + Event::TCPOpeningEvent => write!(f, "TCPOpeningEvent"), + Event::TCPActiveEvent => write!(f, "TCPActiveEvent"), + Event::TCPExpireEvent => write!(f, "TCPExpireEvent"), + Event::TCPClosedEvent => write!(f, "TCPClosedEvent"), - Event::UdpOpeningEvent => write!(f, "UdpOpeningEvent"), - Event::UdpActiveEvent => write!(f, "UdpActiveEvent"), - Event::UdpExpireEvent => write!(f, "UdpExpireEvent"), + Event::UDPOpeningEvent => write!(f, "UDPOpeningEvent"), + Event::UDPActiveEvent => write!(f, "UDPActiveEvent"), + Event::UDPExpireEvent => write!(f, "UDPExpireEvent"), - Event::HttpRequestEvent => write!(f, "HttpRequestEvent"), - Event::HttpResponseEvent => write!(f, "HttpResponseEvent"), + Event::HTTPRequestEvent => write!(f, "HTTPRequestEvent"), + Event::HTTPResponseEvent => write!(f, "HTTPResponseEvent"), } } } diff --git a/src/event/manager.rs b/src/event/manager.rs index 66a56ab..c28b34d 100644 --- a/src/event/manager.rs +++ b/src/event/manager.rs @@ -67,7 +67,7 @@ impl EventManager { } } - pub fn dispatch(&mut self, packet: Option<&Packet>, queue: &mut EventQueue) { + pub fn dispatch(&self, packet: Option<&Packet>, queue: &mut EventQueue) { loop { let result = queue.pop(); if let Some((event, session)) = result { diff --git a/src/main.rs b/src/main.rs index 3dcc273..38c35e0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ use std::cell::RefCell; use std::rc::Rc; +use std::sync::Arc; use stellar_rs::event::event::Event; use stellar_rs::event::manager::EventHandle; use stellar_rs::event::manager::EventManager; @@ -7,6 +8,7 @@ use stellar_rs::event::manager::EventQueue; use stellar_rs::packet::capture::PacketCapture; use stellar_rs::packet::packet::Encapsulation; use stellar_rs::packet::packet::Packet; +use stellar_rs::packet::status::PacketStatus; use stellar_rs::plugin::example::ExamplePulgin; use stellar_rs::session::session::Session; use stellar_rs::session::session::SessionProto; @@ -34,10 +36,10 @@ fn trigger_packet_event( // TODO } Encapsulation::IPv4(_, _) => { - queue.add(Event::Ipv4Event, session.clone()); + queue.add(Event::IPv4Event, session.clone()); } Encapsulation::IPv6(_, _) => { - queue.add(Event::Ipv6Event, session.clone()); + queue.add(Event::IPv6Event, session.clone()); } Encapsulation::GREv0(_, _) => { // TODO @@ -46,10 +48,10 @@ fn trigger_packet_event( // TODO } Encapsulation::TCP(_, _) => { - queue.add(Event::TcpEvent, session.clone()); + queue.add(Event::TCPEvent, session.clone()); } Encapsulation::UDP(_, _) => { - queue.add(Event::UdpEvent, session.clone()); + queue.add(Event::UDPEvent, session.clone()); } Encapsulation::ICMP(_, _) => { // TODO @@ -87,57 +89,54 @@ fn trigger_session_event(session: Option<Rc<RefCell<Session>>>, queue: &mut Even match session_state { SessionState::New => match session_proto { SessionProto::TCP => { - queue.add(Event::TcpOpeningEvent, session); + queue.add(Event::TCPOpeningEvent, session); } SessionProto::UDP => { - queue.add(Event::UdpOpeningEvent, session); + queue.add(Event::UDPOpeningEvent, session); } }, SessionState::Active => match session_proto { SessionProto::TCP => { - queue.add(Event::TcpActiveEvent, session); + queue.add(Event::TCPActiveEvent, session); } SessionProto::UDP => { - queue.add(Event::UdpActiveEvent, session); + queue.add(Event::UDPActiveEvent, session); } }, SessionState::Inactive => match session_proto { SessionProto::TCP => { - queue.add(Event::TcpClosedEvent, session); + queue.add(Event::TCPClosedEvent, session); } SessionProto::UDP => { - queue.add(Event::UdpExpireEvent, session); + queue.add(Event::UDPExpireEvent, session); } }, SessionState::Expired => match session_proto { SessionProto::TCP => { - queue.add(Event::TcpExpireEvent, session); + queue.add(Event::TCPExpireEvent, session); } SessionProto::UDP => { - queue.add(Event::UdpExpireEvent, session); + queue.add(Event::UDPExpireEvent, session); } }, } } -fn handle_one_packet(data: &[u8], len: u32, thread_ctx: &mut ThreadContext) { - let event_mgr = thread_ctx.get_event_mgr(); - let session_mgr = thread_ctx.get_session_mgr(); +fn handle_one_packet(mut packet: Packet, thread_ctx: &mut ThreadContext) { + let event_mgr = &thread_ctx.event_mgr; + let session_mgr = &mut thread_ctx.session_mgr; + let packet_metrics = &mut thread_ctx.packet_metrics; let mut queue = EventQueue::new(); - let mut packet = Packet::new(data, len); - let result = packet.handle(); match result { - Ok(_left) => { - // println!("SUCCESS: {:?}, {:?}", packet, left); - // println!("SUCCESS: {:#?}, {:?}", packet, left); - // dbg!(packet); + Ok(_) => { + packet_metrics.add(PacketStatus::Normal); + // println!("Ok Packet: {:?}", packet); } Err(e) => { - // println!("ERROR Data: {:?}", packet); - // println!("ERROR Code: {:?}", e); - println!("ERROR Desc: {}", e); + packet_metrics.add(e); + println!("Unexpected Packet: {:?} {:?}", e, packet); return; } } @@ -145,7 +144,7 @@ fn handle_one_packet(data: &[u8], len: u32, thread_ctx: &mut ThreadContext) { match packet.get_inner_most_tuple() { Some(_) => { let flow_id = packet.get_flow_id().unwrap(); - let session = session_mgr.borrow_mut().update(flow_id); + let session = session_mgr.update(flow_id); trigger_packet_event(&packet, Some(session.clone()), &mut queue); trigger_session_event(Some(session.clone()), &mut queue); } @@ -153,11 +152,11 @@ fn handle_one_packet(data: &[u8], len: u32, thread_ctx: &mut ThreadContext) { trigger_packet_event(&packet, None, &mut queue); } } - event_mgr.borrow_mut().dispatch(Some(&packet), &mut queue); + event_mgr.dispatch(Some(&packet), &mut queue); - let session = session_mgr.borrow_mut().expire_oldest_session(); + let session = session_mgr.expire_oldest_session(); trigger_session_event(session, &mut queue); - event_mgr.borrow_mut().dispatch(None, &mut queue); + event_mgr.dispatch(None, &mut queue); } fn main() { @@ -167,10 +166,23 @@ fn main() { plugin1.init(&mut event_mgr); plugin2.init(&mut event_mgr); - let event_mgr = Rc::new(RefCell::new(event_mgr)); - let mut thread_ctx = ThreadContext::new(event_mgr); + let event_mgr = Arc::new(event_mgr); + let mut thread0_ctx = ThreadContext::new(0, event_mgr.clone()); + // let mut thread1_ctx = ThreadContext::new(1, event_mgr.clone()); + + run_capture_mode("en0", &mut thread0_ctx); +} - PacketCapture::show_devices(); - let mut cap = PacketCapture::new("en0"); - cap.poll_packet(handle_one_packet, &mut thread_ctx); +fn run_capture_mode(device: &str, thread_ctx: &mut ThreadContext) { + let mut capture = PacketCapture::new(device); + loop { + match capture.next() { + Some(packet) => { + handle_one_packet(packet, thread_ctx); + } + None => { + // do something + } + } + } } diff --git a/src/packet/capture.rs b/src/packet/capture.rs index d71b1a8..645a303 100644 --- a/src/packet/capture.rs +++ b/src/packet/capture.rs @@ -1,4 +1,4 @@ -use crate::thread::thread::ThreadContext; +use crate::packet::packet::Packet; use pcap::Capture; pub struct PacketCapture { @@ -17,22 +17,14 @@ impl PacketCapture { PacketCapture { capture } } - pub fn poll_packet( - &mut self, - callback: fn(data: &[u8], len: u32, ctx: &mut ThreadContext), - ctx: &mut ThreadContext, - ) { - let mut packet_num = 0; - while let Ok(packet) = self.capture.next_packet() { - packet_num += 1; - println!("\n==================== New Packet ====================\n"); - println!("Packet[{}]->header : {:?}", packet_num, packet.header); - println!("Packet[{}]->data : {:?}", packet_num, packet.data); - callback(&packet.data, packet.header.len, ctx); + pub fn next(&mut self) -> Option<Packet> { + match self.capture.next_packet() { + Ok(packet) => Some(Packet::new(packet.data, packet.header.len)), + Err(_) => None, } } - pub fn show_devices() { + pub fn list() { for device in pcap::Device::list().expect("device list failed") { println!("{:?}", device); } diff --git a/src/packet/error.rs b/src/packet/error.rs deleted file mode 100644 index 38103ed..0000000 --- a/src/packet/error.rs +++ /dev/null @@ -1,83 +0,0 @@ -#[derive(Debug)] -pub enum PacketError { - InvalidPacketLength, - - // L2 - UnsupportEthernetType, - IncompleteEthernetFrame, - - IncompleteVLANHeader, - IncompleteMPLSHeader, - IncompletePWEthernetHeader, - - // L3 - UnsupportIPProtocol, - IncompleteIPv4Header, - IncompleteIPv6Header, - - // L3.5 - UnsupportGREVersion, - IncompleteGREHeader, - IncompleteGREv0Header, - IncompleteGREv1Header, - - // L4 - IncompleteUDPHeader, - IncompleteTCPHeader, - IncompleteICMPHeader, - IncompleteICMPv6Header, - - // L TUNNEL - UnsupportGTPVersion, - IncompleteGTPv1Header, - - UnsupportL2TPVersion, - IncompleteL2TPHeader, - - IncompletePPTPHeader, - IncompletePPPHeader, - - UnsupportPPPoEVersion, - IncompletePPPoEHeader, -} - -impl core::fmt::Display for PacketError { - fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { - match *self { - PacketError::InvalidPacketLength => write!(f, "Invalid Packet Length"), - // L2 - PacketError::UnsupportEthernetType => write!(f, "Unsupport Ethernet Type"), - PacketError::IncompleteEthernetFrame => write!(f, "Incomplete Ethernet Frame"), - - PacketError::IncompleteVLANHeader => write!(f, "Incomplete VLAN Header"), - PacketError::IncompleteMPLSHeader => write!(f, "Incomplete MPLS Header"), - PacketError::IncompletePWEthernetHeader => write!(f, "Incomplete PW Ethernet Header"), - // L3 - PacketError::UnsupportIPProtocol => write!(f, "Unsupport IP Protocol"), - PacketError::IncompleteIPv4Header => write!(f, "Incomplete IPv4 Header"), - PacketError::IncompleteIPv6Header => write!(f, "Incomplete IPv6 Header"), - // L3.5 - PacketError::UnsupportGREVersion => write!(f, "Unsupport GRE Version"), - PacketError::IncompleteGREHeader => write!(f, "Incomplete GRE Header"), - PacketError::IncompleteGREv0Header => write!(f, "Incomplete GREv0 Header"), - PacketError::IncompleteGREv1Header => write!(f, "Incomplete GREv1 Header"), - // L4 - PacketError::IncompleteUDPHeader => write!(f, "Incomplete UDP Header"), - PacketError::IncompleteTCPHeader => write!(f, "Incomplete TCP Header"), - PacketError::IncompleteICMPHeader => write!(f, "Incomplete ICMP Header"), - PacketError::IncompleteICMPv6Header => write!(f, "Incomplete ICMPv6 Header"), - // L TUNNEL - PacketError::UnsupportGTPVersion => write!(f, "Unsupport GTP Version"), - PacketError::IncompleteGTPv1Header => write!(f, "Incomplete GTPv1 Header"), - - PacketError::UnsupportL2TPVersion => write!(f, "Unsupport L2TP Version"), - PacketError::IncompleteL2TPHeader => write!(f, "Incomplete L2TP Header"), - - PacketError::IncompletePPTPHeader => write!(f, "Incomplete PPTP Header"), - PacketError::IncompletePPPHeader => write!(f, "Incomplete PPP Header"), - - PacketError::UnsupportPPPoEVersion => write!(f, "Unsupport PPPoE Version"), - PacketError::IncompletePPPoEHeader => write!(f, "Incomplete PPPoE Header"), - } - } -} diff --git a/src/packet/mod.rs b/src/packet/mod.rs index 3201aa1..f8af6a9 100644 --- a/src/packet/mod.rs +++ b/src/packet/mod.rs @@ -1,3 +1,3 @@ pub mod packet; -pub mod error; +pub mod status; pub mod capture;
\ No newline at end of file diff --git a/src/packet/packet.rs b/src/packet/packet.rs index d45c28d..b0c7bc5 100644 --- a/src/packet/packet.rs +++ b/src/packet/packet.rs @@ -1,4 +1,4 @@ -use crate::packet::error::PacketError; +use crate::packet::status::PacketStatus; use crate::protocol::codec::Decode; use crate::protocol::ethernet::EthHeader; use crate::protocol::ethernet::EthType; @@ -65,9 +65,9 @@ impl Packet<'_> { } } - pub fn handle(&mut self) -> Result<(), PacketError> { + pub fn handle(&mut self) -> Result<(), PacketStatus> { if self.orig_data.len() != self.orig_len as usize { - Err(PacketError::InvalidPacketLength) + Err(PacketStatus::InvalidPacketLength) } else { handle_eth(self, self.orig_data) } @@ -367,7 +367,7 @@ impl Packet<'_> { } } -fn handle_eth<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketError> { +fn handle_eth<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketStatus> { let result = EthHeader::decode(input); match result { Ok((payload, header)) => { @@ -380,11 +380,11 @@ fn handle_eth<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), Packet handle_l3(packet, payload, next_proto) } - _ => Err(PacketError::IncompleteEthernetFrame), + _ => Err(PacketStatus::IncompleteEthernetFrame), } } -fn handle_vlan<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketError> { +fn handle_vlan<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketStatus> { let result = VLANHeader::decode(input); match result { Ok((payload, header)) => { @@ -397,11 +397,11 @@ fn handle_vlan<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), Packe handle_l3(packet, payload, next_proto) } - _ => Err(PacketError::IncompleteVLANHeader), + _ => Err(PacketStatus::IncompleteVLANHeader), } } -fn handle_mpls<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketError> { +fn handle_mpls<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketStatus> { let result = MPLSHeader::decode(input); match result { Ok((payload, header)) => { @@ -429,11 +429,11 @@ fn handle_mpls<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), Packe false => handle_mpls(packet, payload), } } - _ => Err(PacketError::IncompleteMPLSHeader), + _ => Err(PacketStatus::IncompleteMPLSHeader), } } -fn handle_pw_eth<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketError> { +fn handle_pw_eth<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketStatus> { let result = PWEthHeader::decode(input); match result { Ok((payload, header)) => { @@ -445,11 +445,11 @@ fn handle_pw_eth<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), Pac handle_eth(packet, payload) } - _ => Err(PacketError::IncompletePWEthernetHeader), + _ => Err(PacketStatus::IncompletePWEthernetHeader), } } -fn handle_ipv4<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketError> { +fn handle_ipv4<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketStatus> { let result = IPv4Header::decode(input); match result { Ok((payload, header)) => { @@ -464,11 +464,11 @@ fn handle_ipv4<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), Packe handle_l4(packet, payload, next_proto) } - _ => Err(PacketError::IncompleteIPv4Header), + _ => Err(PacketStatus::IncompleteIPv4Header), } } -fn handle_ipv6<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketError> { +fn handle_ipv6<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketStatus> { let result = IPv6Header::decode(input); match result { Ok((payload, header)) => { @@ -489,11 +489,11 @@ fn handle_ipv6<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), Packe handle_l4(packet, payload, next_proto) } - _ => Err(PacketError::IncompleteIPv6Header), + _ => Err(PacketStatus::IncompleteIPv6Header), } } -fn handle_tcp<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketError> { +fn handle_tcp<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketStatus> { let result = TCPHeader::decode(input); match result { Ok((payload, header)) => { @@ -517,11 +517,11 @@ fn handle_tcp<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), Packet _ => Ok(()), } } - _ => Err(PacketError::IncompleteTCPHeader), + _ => Err(PacketStatus::IncompleteTCPHeader), } } -fn handle_udp<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketError> { +fn handle_udp<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketStatus> { let result = UDPHeader::decode(input); match result { Ok((payload, header)) => { @@ -538,11 +538,11 @@ fn handle_udp<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), Packet _ => Ok(()), } } - _ => Err(PacketError::IncompleteUDPHeader), + _ => Err(PacketStatus::IncompleteUDPHeader), } } -fn handle_icmp<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketError> { +fn handle_icmp<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketStatus> { let result = ICMPHeader::decode(input); match result { Ok((payload, header)) => { @@ -554,11 +554,11 @@ fn handle_icmp<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), Packe Ok(()) } - _ => Err(PacketError::IncompleteICMPHeader), + _ => Err(PacketStatus::IncompleteICMPHeader), } } -fn handle_icmpv6<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketError> { +fn handle_icmpv6<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketStatus> { let result = ICMPv6Header::decode(input); match result { Ok((payload, header)) => { @@ -570,11 +570,11 @@ fn handle_icmpv6<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), Pac Ok(()) } - _ => Err(PacketError::IncompleteICMPv6Header), + _ => Err(PacketStatus::IncompleteICMPv6Header), } } -fn handle_gtpv1<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketError> { +fn handle_gtpv1<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketStatus> { let result = GTPv1Header::decode(input); match result { Ok((payload, header)) => { @@ -595,12 +595,12 @@ fn handle_gtpv1<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), Pack _ => Ok(()), } } - Err(Incomplete(_)) => Err(PacketError::IncompleteGTPv1Header), - _ => Err(PacketError::UnsupportGTPVersion), + Err(Incomplete(_)) => Err(PacketStatus::IncompleteGTPv1Header), + _ => Err(PacketStatus::UnsupportGTPVersion), } } -fn handle_l2tp<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketError> { +fn handle_l2tp<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketStatus> { let result = L2TPHeader::decode(input); match result { Ok((payload, header)) => { @@ -616,14 +616,14 @@ fn handle_l2tp<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), Packe L2TPType::Data => handle_ppp(packet, payload), } } - Err(Incomplete(_)) => Err(PacketError::IncompleteL2TPHeader), - _ => Err(PacketError::UnsupportL2TPVersion), + Err(Incomplete(_)) => Err(PacketStatus::IncompleteL2TPHeader), + _ => Err(PacketStatus::UnsupportL2TPVersion), } } -fn handle_gre<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketError> { +fn handle_gre<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketStatus> { if input.len() < 2 { - return Err(PacketError::IncompleteGREHeader); + return Err(PacketStatus::IncompleteGREHeader); } let version = input[1] & 0x07; @@ -641,7 +641,7 @@ fn handle_gre<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), Packet handle_l3(packet, payload, next_proto) } - _ => Err(PacketError::IncompleteGREv0Header), + _ => Err(PacketStatus::IncompleteGREv0Header), } } 1 => { @@ -657,14 +657,14 @@ fn handle_gre<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), Packet handle_l3(packet, payload, next_proto) } - _ => Err(PacketError::IncompleteGREv1Header), + _ => Err(PacketStatus::IncompleteGREv1Header), } } - _ => Err(PacketError::UnsupportGREVersion), + _ => Err(PacketStatus::UnsupportGREVersion), } } -fn handle_pptp<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketError> { +fn handle_pptp<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketStatus> { let result = PPTPHeader::decode(input); match result { Ok((payload, header)) => { @@ -676,11 +676,11 @@ fn handle_pptp<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), Packe Ok(()) } - _ => Err(PacketError::IncompletePPTPHeader), + _ => Err(PacketStatus::IncompletePPTPHeader), } } -fn handle_ppp<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketError> { +fn handle_ppp<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketStatus> { let result = PPPHeader::decode(input); match result { Ok((payload, header)) => { @@ -703,11 +703,11 @@ fn handle_ppp<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), Packet _ => Ok(()), } } - _ => Err(PacketError::IncompletePPPHeader), + _ => Err(PacketStatus::IncompletePPPHeader), } } -fn handle_pppoe<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketError> { +fn handle_pppoe<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), PacketStatus> { let result = PPPoEHeader::decode(input); match result { Ok((payload, header)) => { @@ -737,8 +737,8 @@ fn handle_pppoe<'a>(packet: &mut Packet<'a>, input: &'a [u8]) -> Result<(), Pack None => Ok(()), } } - Err(Incomplete(_)) => Err(PacketError::IncompletePPPoEHeader), - _ => Err(PacketError::UnsupportPPPoEVersion), + Err(Incomplete(_)) => Err(PacketStatus::IncompletePPPoEHeader), + _ => Err(PacketStatus::UnsupportPPPoEVersion), } } @@ -746,7 +746,7 @@ fn handle_l3<'a>( packet: &mut Packet<'a>, input: &'a [u8], next_proto: EthType, -) -> Result<(), PacketError> { +) -> Result<(), PacketStatus> { match next_proto { EthType::PPPoEdiscovery => handle_pppoe(packet, input), EthType::PPPoEsession => handle_pppoe(packet, input), @@ -756,7 +756,7 @@ fn handle_l3<'a>( EthType::VLAN => handle_vlan(packet, input), EthType::IPv4 => handle_ipv4(packet, input), EthType::IPv6 => handle_ipv6(packet, input), - _e => Err(PacketError::UnsupportEthernetType), + other => Err(PacketStatus::UnsupportEthernetType(other)), } } @@ -764,7 +764,7 @@ fn handle_l4<'a>( packet: &mut Packet<'a>, input: &'a [u8], next_proto: IPProtocol, -) -> Result<(), PacketError> { +) -> Result<(), PacketStatus> { match next_proto { IPProtocol::GRE => handle_gre(packet, input), IPProtocol::IPINIP => handle_ipv4(packet, input), @@ -773,7 +773,7 @@ fn handle_l4<'a>( IPProtocol::ICMP6 => handle_icmpv6(packet, input), IPProtocol::UDP => handle_udp(packet, input), IPProtocol::TCP => handle_tcp(packet, input), - _e => Err(PacketError::UnsupportIPProtocol), + other => Err(PacketStatus::UnsupportIPProtocol(other)), } } diff --git a/src/packet/status.rs b/src/packet/status.rs new file mode 100644 index 0000000..25585c0 --- /dev/null +++ b/src/packet/status.rs @@ -0,0 +1,74 @@ +use crate::protocol::ethernet::EthType; +use crate::protocol::ip::IPProtocol; +use std::collections::HashMap; + +#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq)] +pub enum PacketStatus { + Normal, + + InvalidPacketLength, + + // L2 + UnsupportEthernetType(EthType), + IncompleteEthernetFrame, + + IncompleteVLANHeader, + IncompleteMPLSHeader, + IncompletePWEthernetHeader, + + // L3 + UnsupportIPProtocol(IPProtocol), + IncompleteIPv4Header, + IncompleteIPv6Header, + + // L3.5 + UnsupportGREVersion, + IncompleteGREHeader, + IncompleteGREv0Header, + IncompleteGREv1Header, + + // L4 + IncompleteUDPHeader, + IncompleteTCPHeader, + IncompleteICMPHeader, + IncompleteICMPv6Header, + + // L TUNNEL + UnsupportGTPVersion, + IncompleteGTPv1Header, + + UnsupportL2TPVersion, + IncompleteL2TPHeader, + + IncompletePPTPHeader, + IncompletePPPHeader, + + UnsupportPPPoEVersion, + IncompletePPPoEHeader, +} + +#[derive(Debug)] +pub struct PacketMetrics { + map: HashMap<PacketStatus, usize>, +} + +impl PacketMetrics { + pub fn new() -> PacketMetrics { + PacketMetrics { + map: HashMap::new(), + } + } + + pub fn add(&mut self, err: PacketStatus) { + let count = self.map.entry(err).or_insert(0); + *count += 1; + } + + pub fn get(&self, err: PacketStatus) -> Option<&usize> { + self.map.get(&err) + } + + pub fn dump(&self) { + println!("PacketMetrics: {:?}", self.map); + } +} diff --git a/src/plugin/example.rs b/src/plugin/example.rs index d1f6b46..7a4d4ec 100644 --- a/src/plugin/example.rs +++ b/src/plugin/example.rs @@ -24,11 +24,11 @@ impl ExamplePulgin { impl EventHandle for ExamplePulgin { fn init(&mut self, event_mgr: &mut EventManager) { - event_mgr.register(Event::TcpOpeningEvent, Box::new(self.clone())); - event_mgr.register(Event::TcpActiveEvent, Box::new(self.clone())); - event_mgr.register(Event::TcpExpireEvent, Box::new(self.clone())); - event_mgr.register(Event::TcpClosedEvent, Box::new(self.clone())); - event_mgr.register(Event::HttpRequestEvent, Box::new(self.clone())); + event_mgr.register(Event::TCPOpeningEvent, Box::new(self.clone())); + event_mgr.register(Event::TCPActiveEvent, Box::new(self.clone())); + event_mgr.register(Event::TCPExpireEvent, Box::new(self.clone())); + event_mgr.register(Event::TCPClosedEvent, Box::new(self.clone())); + event_mgr.register(Event::HTTPRequestEvent, Box::new(self.clone())); } fn handle( @@ -51,26 +51,26 @@ impl EventHandle for ExamplePulgin { let session = session.unwrap(); session.borrow_mut().inc_session_c2s_metrics(0, 0, 1, 1); match event { - Event::TcpOpeningEvent => { - println!("{} handle TcpOpeningEvent: {:?}", self.plugin_name, session); + Event::TCPOpeningEvent => { + println!("{} handle TCPOpeningEvent: {:?}", self.plugin_name, session); let (src_port, dst_port) = packet.unwrap().get_inner_most_port().unwrap(); if src_port == 80 || dst_port == 80 { - println!("{} add HttpRequestEvent", self.plugin_name); - queue.add(Event::HttpRequestEvent, Some(session)); + println!("{} add HTTPRequestEvent", self.plugin_name); + queue.add(Event::HTTPRequestEvent, Some(session)); } } - Event::TcpActiveEvent => { - println!("{} handle TcpActiveEvent: {:?}", self.plugin_name, session); + Event::TCPActiveEvent => { + println!("{} handle TCPActiveEvent: {:?}", self.plugin_name, session); } - Event::TcpExpireEvent => { - println!("{} handle TcpExpireEvent: {:?}", self.plugin_name, session); + Event::TCPExpireEvent => { + println!("{} handle TCPExpireEvent: {:?}", self.plugin_name, session); } - Event::TcpClosedEvent => { - println!("{} handle TcpClosedEvent: {:?}", self.plugin_name, session); + Event::TCPClosedEvent => { + println!("{} handle TCPClosedEvent: {:?}", self.plugin_name, session); } - Event::HttpRequestEvent => { + Event::HTTPRequestEvent => { println!( - "{} handle HttpRequestEvent: {:?}", + "{} handle HTTPRequestEvent: {:?}", self.plugin_name, session ); } diff --git a/src/protocol/dns.rs b/src/protocol/dns.rs index 3b39af0..0f62b49 100644 --- a/src/protocol/dns.rs +++ b/src/protocol/dns.rs @@ -794,15 +794,11 @@ impl DNSResourceRecordData { )) } (_, DNSQtype::HINFO) => { - let (input, _cpu) = multi::length_data(number::streaming::be_u8)(input)?; - let (input, _os) = multi::length_data(number::streaming::be_u8)(input)?; - Ok(( - input, - DNSResourceRecordData::HINFO { - cpu: _cpu.to_vec(), - os: _os.to_vec(), - }, - )) + let (input, cpu) = multi::length_data(number::streaming::be_u8)(input) + .map(|(i, l)| (i, l.to_vec()))?; + let (input, os) = multi::length_data(number::streaming::be_u8)(input) + .map(|(i, l)| (i, l.to_vec()))?; + Ok((input, DNSResourceRecordData::HINFO { cpu, os })) } (_, DNSQtype::MX) => { let (input, _preference) = number::streaming::be_u16(input)?; diff --git a/src/protocol/ethernet.rs b/src/protocol/ethernet.rs index 396dc22..f8e39f8 100644 --- a/src/protocol/ethernet.rs +++ b/src/protocol/ethernet.rs @@ -9,7 +9,7 @@ use nom::IResult; #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub struct MacAddress(pub [u8; 6]); -#[derive(Clone, Copy, Debug, PartialEq, Eq)] +#[derive(Clone, Copy, Hash, Debug, PartialEq, Eq)] pub enum EthType { LANMIN, // 802.3 Min data length LANMAX, // 802.3 Max data length @@ -130,9 +130,9 @@ impl From<u16> for EthType { impl Decode for EthType { type Iterm = EthType; fn decode(input: &[u8]) -> IResult<&[u8], EthType> { - let (input, ether_type) = number::streaming::be_u16(input)?; + let (input, ether_type) = number::streaming::be_u16(input).map(|(i, l)| (i, l.into()))?; - Ok((input, ether_type.into())) + Ok((input, ether_type)) } } diff --git a/src/protocol/gtpv1.rs b/src/protocol/gtpv1.rs index bc01b41..8a7fc4d 100644 --- a/src/protocol/gtpv1.rs +++ b/src/protocol/gtpv1.rs @@ -79,13 +79,14 @@ fn extension_decode(input: &[u8]) -> IResult<&[u8], GTPv1ExtensionHeader> { (2 - length * 4).into(), ))); } - let (input, contents) = bytes::streaming::take(length * 4 - 2)(input)?; + let (input, contents) = + bytes::streaming::take(length * 4 - 2)(input).map(|(i, l)| (i, l.to_vec()))?; let (input, next_header_type) = number::streaming::be_u8(input)?; Ok(( input, GTPv1ExtensionHeader { length, - contents: contents.to_vec(), + contents, next_header_type, }, )) diff --git a/src/protocol/icmp.rs b/src/protocol/icmp.rs index 991a191..99d0f2d 100644 --- a/src/protocol/icmp.rs +++ b/src/protocol/icmp.rs @@ -66,18 +66,19 @@ impl From<u8> for ICMPType { impl Decode for ICMPHeader { type Iterm = ICMPHeader; fn decode(input: &[u8]) -> IResult<&[u8], ICMPHeader> { - let (input, icmp_type) = number::streaming::be_u8(input)?; + let (input, icmp_type) = number::streaming::be_u8(input).map(|(i, l)| (i, l.into()))?; let (input, icmp_code) = number::streaming::be_u8(input)?; let (input, icmp_checksum) = number::streaming::be_u16(input)?; - let (input, icmp_extended) = nom::bytes::streaming::take(4u8)(input)?; + let (input, icmp_extended) = + nom::bytes::streaming::take(4u8)(input).map(|(i, l)| (i, l.to_vec()))?; Ok(( input, ICMPHeader { - icmp_type: icmp_type.into(), + icmp_type, icmp_code, icmp_checksum, - icmp_extended: icmp_extended.to_vec(), + icmp_extended, }, )) } diff --git a/src/protocol/ip.rs b/src/protocol/ip.rs index 1ff15e3..436fabb 100644 --- a/src/protocol/ip.rs +++ b/src/protocol/ip.rs @@ -6,7 +6,7 @@ use nom::IResult; * Struct ******************************************************************************/ -#[derive(Clone, Copy, Debug, PartialEq, Eq)] +#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] pub enum IPProtocol { IPV6HOP, ICMP, @@ -92,8 +92,8 @@ impl IPProtocol { impl Decode for IPProtocol { type Iterm = IPProtocol; fn decode(input: &[u8]) -> IResult<&[u8], IPProtocol> { - let (input, protocol) = number::streaming::be_u8(input)?; + let (input, protocol) = number::streaming::be_u8(input).map(|(i, l)| (i, l.into()))?; - Ok((input, protocol.into())) + Ok((input, protocol)) } } diff --git a/src/protocol/ipv6.rs b/src/protocol/ipv6.rs index 8ee78c3..11666d4 100644 --- a/src/protocol/ipv6.rs +++ b/src/protocol/ipv6.rs @@ -113,14 +113,15 @@ fn extension_decode(input: &[u8], curr_proto: IPProtocol) -> IResult<&[u8], IPv6 } else { ext_length = ext_length * 8 + 8; // update other extension header length } - let (input, data) = bytes::streaming::take(ext_length - 2)(input)?; + let (input, data) = + bytes::streaming::take(ext_length - 2)(input).map(|(i, l)| (i, l.to_vec()))?; Ok(( input, IPv6Extension { next_header, ext_length, - data: data.to_vec(), + data, }, )) } diff --git a/src/protocol/l2tp.rs b/src/protocol/l2tp.rs index 90c5ea5..445062f 100644 --- a/src/protocol/l2tp.rs +++ b/src/protocol/l2tp.rs @@ -294,7 +294,7 @@ fn avp_decode(input: &[u8]) -> IResult<&[u8], L2TPAVPHeader> { bits::streaming::take(10u8), )))(input)?; let (input, vendor_id) = number::streaming::be_u16(input)?; - let (input, attribute_type) = number::streaming::be_u16(input)?; + let (input, attribute_type) = number::streaming::be_u16(input).map(|(i, l)| (i, l.into()))?; /* * Length: Encodes the number of octets (including the Overall Length * and bitmask fields) contained in this AVP. The Length may be @@ -306,7 +306,8 @@ fn avp_decode(input: &[u8]) -> IResult<&[u8], L2TPAVPHeader> { if length < 6 { return Err(nom::Err::Incomplete(nom::Needed::new((6 - length).into()))); } - let (input, attribute_value) = bytes::streaming::take(length - 6)(input)?; + let (input, attribute_value) = + bytes::streaming::take(length - 6)(input).map(|(i, l)| (i, l.to_vec()))?; Ok(( input, @@ -316,8 +317,8 @@ fn avp_decode(input: &[u8]) -> IResult<&[u8], L2TPAVPHeader> { reserved, length, vendor_id, - attribute_type: attribute_type.into(), - attribute_value: attribute_value.to_vec(), + attribute_type, + attribute_value, }, )) } diff --git a/src/protocol/ppp.rs b/src/protocol/ppp.rs index 97fa0e5..1c08ce7 100644 --- a/src/protocol/ppp.rs +++ b/src/protocol/ppp.rs @@ -51,9 +51,9 @@ impl From<u16> for PPPProtocol { impl Decode for PPPProtocol { type Iterm = PPPProtocol; fn decode(input: &[u8]) -> IResult<&[u8], PPPProtocol> { - let (input, protocol) = number::streaming::be_u16(input)?; + let (input, protocol) = number::streaming::be_u16(input).map(|(i, l)| (i, l.into()))?; - Ok((input, protocol.into())) + Ok((input, protocol)) } } diff --git a/src/protocol/pppoe.rs b/src/protocol/pppoe.rs index fbd8448..e63dbb1 100644 --- a/src/protocol/pppoe.rs +++ b/src/protocol/pppoe.rs @@ -118,15 +118,16 @@ fn version_type_decode(input: &[u8]) -> IResult<&[u8], (u8, u8)> { } fn pppoe_tag_decode(input: &[u8]) -> IResult<&[u8], PPPoETag> { - let (input, tag_type) = number::complete::be_u16(input)?; + let (input, tag_type) = number::complete::be_u16(input).map(|(i, l)| (i, l.into()))?; let (input, tag_length) = number::complete::be_u16(input)?; - let (input, tag_value) = nom::bytes::complete::take(tag_length)(input)?; + let (input, tag_value) = + nom::bytes::complete::take(tag_length)(input).map(|(i, l)| (i, l.to_vec()))?; Ok(( input, PPPoETag { - tag_type: tag_type.into(), + tag_type, tag_length, - tag_value: tag_value.to_vec(), + tag_value, }, )) } @@ -165,10 +166,10 @@ impl Decode for PPPoEHeader { ))) } } - let (input, code) = number::complete::be_u8(input)?; + let (input, code) = number::complete::be_u8(input).map(|(i, l)| (i, l.into()))?; let (input, session_id) = number::complete::be_u16(input)?; let (input, payload_length) = number::complete::be_u16(input)?; - let (input, stage) = match code.into() { + let (input, stage) = match code { PPPoECode::SessionData => { let (input, ppp_protocol) = PPPProtocol::decode(input)?; (input, PPPoEStage::Session(ppp_protocol)) @@ -184,7 +185,7 @@ impl Decode for PPPoEHeader { PPPoEHeader { version, type_, - code: code.into(), + code, session_id, payload_length, stage, diff --git a/src/protocol/pptp.rs b/src/protocol/pptp.rs index ed64a26..c9e2b40 100644 --- a/src/protocol/pptp.rs +++ b/src/protocol/pptp.rs @@ -107,9 +107,10 @@ impl Decode for PPTPHeader { type Iterm = PPTPHeader; fn decode(input: &[u8]) -> IResult<&[u8], PPTPHeader> { let (input, length) = number::streaming::be_u16(input)?; - let (input, message_type) = number::streaming::be_u16(input)?; + let (input, message_type) = number::streaming::be_u16(input).map(|(i, l)| (i, l.into()))?; let (input, magic_cookie) = number::streaming::be_u32(input)?; - let (input, control_message_type) = number::streaming::be_u16(input)?; + let (input, control_message_type) = + number::streaming::be_u16(input).map(|(i, l)| (i, l.into()))?; let (input, reserved0) = number::streaming::be_u16(input)?; let need = length - 12; @@ -121,9 +122,9 @@ impl Decode for PPTPHeader { let header = PPTPHeader { length, - message_type: message_type.into(), + message_type, magic_cookie, - control_message_type: control_message_type.into(), + control_message_type, reserved0, payload, }; diff --git a/src/thread/thread.rs b/src/thread/thread.rs index fea8426..7887343 100644 --- a/src/thread/thread.rs +++ b/src/thread/thread.rs @@ -1,26 +1,22 @@ use crate::event::manager::EventManager; +use crate::packet::status::PacketMetrics; use crate::session::manager::SessionManager; -use std::cell::RefCell; -use std::rc::Rc; +use std::sync::Arc; pub struct ThreadContext { - event_mgr: Rc<RefCell<EventManager>>, - session_mgr: Rc<RefCell<SessionManager>>, + pub thread_id: usize, + pub event_mgr: Arc<EventManager>, + pub session_mgr: SessionManager, + pub packet_metrics: PacketMetrics, } impl ThreadContext { - pub fn new(event_mgr: Rc<RefCell<EventManager>>) -> Self { + pub fn new(thread_id: usize, event_mgr: Arc<EventManager>) -> Self { ThreadContext { + thread_id, event_mgr, - session_mgr: Rc::new(RefCell::new(SessionManager::new(4096))), + session_mgr: SessionManager::new(4096), + packet_metrics: PacketMetrics::new(), } } - - pub fn get_event_mgr(&self) -> Rc<RefCell<EventManager>> { - self.event_mgr.clone() - } - - pub fn get_session_mgr(&self) -> Rc<RefCell<SessionManager>> { - self.session_mgr.clone() - } } |
