diff options
| author | luwenpeng <[email protected]> | 2023-09-18 14:47:10 +0800 |
|---|---|---|
| committer | luwenpeng <[email protected]> | 2023-09-18 17:52:39 +0800 |
| commit | cb674f9e168b6e709136e17a5bc87d3925c6f479 (patch) | |
| tree | a735d755c1b653ae59b0b47d05ed720aa573f0b6 /src/main.rs | |
| parent | 9387c343d38c00efb432cfb419a3c669f4d65b3a (diff) | |
[refactor] Event manager: Support triggering new events in event handle, Solve the problem of multiple borrowing
Diffstat (limited to 'src/main.rs')
| -rw-r--r-- | src/main.rs | 113 |
1 files changed, 62 insertions, 51 deletions
diff --git a/src/main.rs b/src/main.rs index 49a0728..81ef6f4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,9 @@ use std::cell::RefCell; use std::rc::Rc; -use stellar_rs::event::event::BuiltInEvent; +use stellar_rs::event::event::Event; use stellar_rs::event::manager::EventHandle; use stellar_rs::event::manager::EventManager; +use stellar_rs::event::manager::EventQueue; use stellar_rs::packet::capture::PacketCapture; use stellar_rs::packet::packet::Encapsulation; use stellar_rs::packet::packet::Packet; @@ -12,61 +13,55 @@ use stellar_rs::session::session::SessionProto; use stellar_rs::session::session::SessionState; use stellar_rs::thread::thread::ThreadContex; -fn trigger_event_by_packet( +fn trigger_packet_event( packet: &Packet, session: Option<Rc<RefCell<Session>>>, - event_mgr: Rc<RefCell<EventManager>>, + queue: &mut EventQueue, ) { let num = packet.encapsulation.len(); for i in 0..num { match packet.encapsulation[i] { - Encapsulation::L2_ETH(_, _) => { + Encapsulation::Eth(_, _) => { // TODO } - Encapsulation::L2_VLAN(_, _) => { + Encapsulation::Vlan(_, _) => { // TODO } - Encapsulation::L2_MPLS(_, _) => { + Encapsulation::Mpls(_, _) => { // TODO } - Encapsulation::L2_PWETH(_, _) => { + Encapsulation::PwEth(_, _) => { // TODO } - Encapsulation::L3_IPV4(_, _) => { - BuiltInEvent::trigger_ipv4_event(event_mgr.clone(), session.clone()); + Encapsulation::Ipv4(_, _) => { + queue.add(Event::Ipv4Event, session.clone()); } - Encapsulation::L3_IPV6(_, _) => { - BuiltInEvent::trigger_ipv6_event(event_mgr.clone(), session.clone()); + Encapsulation::Ipv6(_, _) => { + queue.add(Event::Ipv6Event, session.clone()); } - Encapsulation::L4_TCP(_, _) => { - BuiltInEvent::trigger_tcp_event(event_mgr.clone(), session.clone()); + Encapsulation::Tcp(_, _) => { + queue.add(Event::TcpEvent, session.clone()); } - Encapsulation::L4_UDP(_, _) => { - BuiltInEvent::trigger_udp_event(event_mgr.clone(), session.clone()); + Encapsulation::Udp(_, _) => { + queue.add(Event::UdpEvent, session.clone()); } - Encapsulation::L4_ICMP(_, _) => { + Encapsulation::Icmp(_, _) => { // TODO } - Encapsulation::L4_ICMPV6(_, _) => { + Encapsulation::Icmpv6(_, _) => { // TODO } - Encapsulation::LTUN_GTPV1_C(_, _) => { + Encapsulation::Gtpv1(_, _) => { // TODO } - Encapsulation::LTUN_L2TP(_, _) => { - // TODO - } - Encapsulation::UNSUPPORTED(_) => { + Encapsulation::L2tp(_, _) => { // TODO } } } } -fn trigger_event_by_session( - session: Option<Rc<RefCell<Session>>>, - event_mgr: Rc<RefCell<EventManager>>, -) { +fn trigger_session_event(session: Option<Rc<RefCell<Session>>>, queue: &mut EventQueue) { if session.is_none() { return; } @@ -76,31 +71,47 @@ fn trigger_event_by_session( match session_state { SessionState::New => match session_proto { - SessionProto::TCP => BuiltInEvent::trigger_tcp_opening_event(event_mgr, session), - SessionProto::UDP => BuiltInEvent::trigger_udp_opening_event(event_mgr, session), + SessionProto::TCP => { + queue.add(Event::TcpOpeningEvent, session); + } + SessionProto::UDP => { + queue.add(Event::UdpOpeningEvent, session); + } }, SessionState::Active => match session_proto { - SessionProto::TCP => BuiltInEvent::trigger_tcp_active_event(event_mgr, session), - SessionProto::UDP => BuiltInEvent::trigger_udp_active_event(event_mgr, session), + SessionProto::TCP => { + queue.add(Event::TcpActiveEvent, session); + } + SessionProto::UDP => { + queue.add(Event::UdpActiveEvent, session); + } }, SessionState::Inactive => match session_proto { - SessionProto::TCP => BuiltInEvent::trigger_tcp_closed_event(event_mgr, session), - SessionProto::UDP => BuiltInEvent::trigger_udp_expire_event(event_mgr, session), + SessionProto::TCP => { + queue.add(Event::TcpClosedEvent, session); + } + SessionProto::UDP => { + queue.add(Event::UdpExpireEvent, session); + } }, SessionState::Expired => match session_proto { - SessionProto::TCP => BuiltInEvent::trigger_tcp_expire_event(event_mgr, session), - SessionProto::UDP => BuiltInEvent::trigger_udp_expire_event(event_mgr, session), + SessionProto::TCP => { + queue.add(Event::TcpExpireEvent, session); + } + SessionProto::UDP => { + queue.add(Event::UdpExpireEvent, session); + } }, } } fn handle_one_packet(data: &[u8], len: u32, ctx: Rc<RefCell<ThreadContex>>) { - let event_mgr = ctx.borrow_mut().get_event_mgr(); - let session_mgr = ctx.borrow_mut().get_session_mgr(); + let event_mgr = ctx.borrow().get_event_mgr(); + let session_mgr = ctx.borrow().get_session_mgr(); + let mut queue = EventQueue::new(); let mut packet = Packet::new(data, len); - // Handle Packet let result = packet.handle(); match result { Ok(_left) => { @@ -116,30 +127,30 @@ fn handle_one_packet(data: &[u8], len: u32, ctx: Rc<RefCell<ThreadContex>>) { } } - // Packets have sessions, triggering both packet events and session events if packet.get_inner_tuple().is_some() { let flow_id = packet.get_flow_id().unwrap(); let session = session_mgr.borrow_mut().update(flow_id); - trigger_event_by_packet(&packet, Some(session.clone()), event_mgr.clone()); - trigger_event_by_session(Some(session.clone()), event_mgr.clone()); - // Packets have no sessions, only packet events are triggered + trigger_packet_event(&packet, Some(session.clone()), &mut queue); + trigger_session_event(Some(session.clone()), &mut queue); } else { - trigger_event_by_packet(&packet, None, event_mgr.clone()); + trigger_packet_event(&packet, None, &mut queue); } + event_mgr.borrow_mut().dispatch(Some(&packet), &mut queue); - // Handle packet events and session events on the current package - event_mgr.borrow_mut().dispatch(Some(&packet)); - - // Handle expire events without packets let session = session_mgr.borrow_mut().expire_oldest_session(); - trigger_event_by_session(session, event_mgr.clone()); - event_mgr.borrow_mut().dispatch(None); + trigger_session_event(session, &mut queue); + event_mgr.borrow_mut().dispatch(None, &mut queue); } fn main() { - let thread_ctx = Rc::new(RefCell::new(ThreadContex::new())); - let mut plugin = ExamplePulgin::new("Example Plugin", thread_ctx.clone()); - plugin.init(); + let mut event_mgr = EventManager::new(); + let mut plugin1 = ExamplePulgin::new("Plugin1"); + let mut plugin2 = ExamplePulgin::new("Plugin2"); + plugin1.init(&mut event_mgr); + plugin2.init(&mut event_mgr); + + let event_mgr = Rc::new(RefCell::new(event_mgr)); + let thread_ctx = Rc::new(RefCell::new(ThreadContex::new(event_mgr))); PacketCapture::show_devices(); let mut cap = PacketCapture::new("en0"); |
