use std::cell::RefCell; use std::rc::Rc; use stellar_rs::event::event::Event; use stellar_rs::event::manager::EventHandle; use stellar_rs::event::manager::EventManager; use stellar_rs::event::manager::EventQueue; use stellar_rs::packet::marsio::PcapImpl; use stellar_rs::packet::packet::Encapsulation; use stellar_rs::packet::packet::Packet; use stellar_rs::plugin::example::ExamplePulgin; use stellar_rs::session::session::Session; use stellar_rs::session::session::SessionProto; use stellar_rs::session::session::SessionState; use stellar_rs::thread::thread::ThreadContex; fn trigger_packet_event( packet: &Packet, session: Option>>, queue: &mut EventQueue, ) { let num = packet.encapsulation.len(); for i in 0..num { match packet.encapsulation[i] { Encapsulation::Eth(_, _) => { // TODO } Encapsulation::Vlan(_, _) => { // TODO } Encapsulation::Mpls(_, _) => { // TODO } Encapsulation::PwEth(_, _) => { // TODO } Encapsulation::Ipv4(_, _) => { queue.add(Event::Ipv4Event, session.clone()); } Encapsulation::Ipv6(_, _) => { queue.add(Event::Ipv6Event, session.clone()); } Encapsulation::Tcp(_, _) => { queue.add(Event::TcpEvent, session.clone()); } Encapsulation::Udp(_, _) => { queue.add(Event::UdpEvent, session.clone()); } Encapsulation::Icmp(_, _) => { // TODO } Encapsulation::Icmpv6(_, _) => { // TODO } Encapsulation::Gtpv1(_, _) => { // TODO } Encapsulation::L2tp(_, _) => { // TODO } } } } fn trigger_session_event(session: Option>>, queue: &mut EventQueue) { if session.is_none() { return; } let session_state = session.clone().unwrap().borrow_mut().get_session_state(); let session_proto = session.clone().unwrap().borrow_mut().get_session_proto(); match session_state { SessionState::New => match session_proto { SessionProto::TCP => { queue.add(Event::TcpOpeningEvent, session); } SessionProto::UDP => { queue.add(Event::UdpOpeningEvent, session); } }, SessionState::Active => match session_proto { SessionProto::TCP => { queue.add(Event::TcpActiveEvent, session); } SessionProto::UDP => { queue.add(Event::UdpActiveEvent, session); } }, SessionState::Inactive => match session_proto { SessionProto::TCP => { queue.add(Event::TcpClosedEvent, session); } SessionProto::UDP => { queue.add(Event::UdpExpireEvent, session); } }, SessionState::Expired => match session_proto { SessionProto::TCP => { queue.add(Event::TcpExpireEvent, session); } SessionProto::UDP => { queue.add(Event::UdpExpireEvent, session); } }, } } fn handle_one_packet(data: &[u8], len: u32, ctx: Rc>) { let event_mgr = ctx.borrow().get_event_mgr(); let session_mgr = ctx.borrow().get_session_mgr(); let mut queue = EventQueue::new(); let mut packet = Packet::new(data, len); let result = packet.handle(); match result { Ok(_left) => { // println!("SUCCESS: {:?}, {:?}", packet, left); // println!("SUCCESS: {:#?}, {:?}", packet, left); // dbg!(packet); } Err(e) => { // println!("ERROR Data: {:?}", packet); // println!("ERROR Code: {:?}", e); println!("ERROR Desc: {}", e); return; } } if packet.get_inner_tuple().is_some() { let flow_id = packet.get_flow_id().unwrap(); let session = session_mgr.borrow_mut().update(flow_id); trigger_packet_event(&packet, Some(session.clone()), &mut queue); trigger_session_event(Some(session.clone()), &mut queue); } else { trigger_packet_event(&packet, None, &mut queue); } event_mgr.borrow_mut().dispatch(Some(&packet), &mut queue); let session = session_mgr.borrow_mut().expire_oldest_session(); trigger_session_event(session, &mut queue); event_mgr.borrow_mut().dispatch(None, &mut queue); } fn main() { let mut event_mgr = EventManager::new(); let mut plugin1 = ExamplePulgin::new("Plugin1"); let mut plugin2 = ExamplePulgin::new("Plugin2"); plugin1.init(&mut event_mgr); plugin2.init(&mut event_mgr); let event_mgr = Rc::new(RefCell::new(event_mgr)); let thread_ctx = Rc::new(RefCell::new(ThreadContex::new(event_mgr))); PcapImpl::show_devices(); let mut cap = PcapImpl::new("en0"); cap.poll_packet(handle_one_packet, thread_ctx); }